You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/10 06:37:23 UTC
svn commit: r1491333 [2/2] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/client/event/message/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/command/wf/
core/src/main/java/org/apache/oozie/event/me...
Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.AppType;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSLACalculatorMemory extends XDataTestCase {
+
+ private JPAService jpaService;
+
+ @Override
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ Services services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService,"
+ + "org.apache.oozie.sla.service.SLAService");
+ services.init();
+ jpaService = Services.get().get(JPAService.class);
+ cleanUpDBTables();
+ }
+
+ @Override
+ @After
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testLoadOnRestart() throws Exception {
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ String jobId1 = slaRegBean1.getId();
+ SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+ String jobId2 = slaRegBean2.getId();
+ SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB);
+ String jobId3 = slaRegBean3.getId();
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+ slaRegBean1.setAppName("app-name");
+ slaRegBean1.setExpectedDuration(123);
+ slaRegBean1.setExpectedEnd(sdf.parse("2012-02-07"));
+ slaRegBean1.setExpectedStart(sdf.parse("2011-02-07"));
+ slaRegBean1.setNominalTime(sdf.parse("2012-01-06"));
+ slaRegBean1.setUser("user");
+ slaRegBean1.setParentId("parentId");
+ slaRegBean1.setUpstreamApps("upstreamApps");
+ slaRegBean1.setNotificationMsg("notificationMsg");
+ slaRegBean1.setAlertContact("a@abc.com");
+ slaRegBean1.setAlertEvents("MISS");
+ slaRegBean1.setJobData("jobData");
+
+ slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+ slaCalcMemory.addRegistration(jobId2, slaRegBean2);
+ slaCalcMemory.addRegistration(jobId3, slaRegBean3);
+
+ SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+ SLACalcStatus calc2 = slaCalcMemory.get(jobId2);
+ SLACalcStatus calc3 = slaCalcMemory.get(jobId3);
+
+ calc1.setEventProcessed(5);
+ calc2.setEventProcessed(6);
+ calc3.setEventProcessed(7);
+
+ calc1.setEventStatus(SLAEvent.EventStatus.END_MISS);
+ calc1.setSLAStatus(SLAEvent.SLAStatus.MISS);
+ calc1.setJobStatus(WorkflowJob.Status.FAILED.toString());
+ // set last modified time 5 days back
+ Date lastModifiedTime = new Date(System.currentTimeMillis() - 5*24*60*60*1000);
+ calc1.setLastModifiedTime(lastModifiedTime);
+
+ List<JsonBean> list = new ArrayList<JsonBean>();
+ SLASummaryBean bean = new SLASummaryBean(calc1);
+ bean.setActualStart(sdf.parse("2011-03-09"));
+ bean.setActualEnd(sdf.parse("2011-03-10"));
+ bean.setActualDuration(456);
+ list.add(bean);
+ list.add(new SLASummaryBean(calc2));
+ list.add(new SLASummaryBean(calc3));
+
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+
+ assertEquals(2, slaCalcMemory.size());
+
+ SLACalcStatus calc = slaCalcMemory.get(jobId1);
+ assertEquals("job-1", calc.getId());
+ assertEquals(AppType.WORKFLOW_JOB, calc.getAppType());
+ assertEquals("app-name", calc.getAppName());
+ assertEquals(123, calc.getExpectedDuration());
+ assertEquals(sdf.parse("2012-02-07"), calc.getExpectedEnd());
+ assertEquals(sdf.parse("2011-02-07"), calc.getExpectedStart());
+ assertEquals(sdf.parse("2012-01-06"), calc.getNominalTime());
+ assertEquals("user", calc.getUser());
+ assertEquals("parentId", calc.getParentId());
+ assertEquals("upstreamApps", calc.getUpstreamApps());
+ assertEquals("notificationMsg", calc.getNotificationMsg());
+ assertEquals("a@abc.com", calc.getAlertContact());
+ assertEquals("MISS", calc.getAlertEvents());
+ assertEquals("jobData", calc.getJobData());
+ assertEquals(sdf.parse("2011-03-09"), calc.getActualStart());
+ assertEquals(sdf.parse("2011-03-10"), calc.getActualEnd());
+ assertEquals(456, calc.getActualDuration());
+ assertEquals(SLAEvent.EventStatus.END_MISS, calc1.getEventStatus());
+ assertEquals(SLAEvent.SLAStatus.MISS, calc1.getSLAStatus());
+ assertEquals(WorkflowJob.Status.FAILED.toString(), calc1.getJobStatus());
+ assertEquals(lastModifiedTime, calc1.getLastModifiedTime());
+
+ assertEquals(5, calc.getEventProcessed());
+ assertEquals(6, slaCalcMemory.get(jobId2).getEventProcessed());
+ // jobId3 should be in history set as eventprocessed is 7 (111)
+ assertNull(slaCalcMemory.get(jobId3));
+ slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+ sdf.parse("2011-03-09"), sdf.parse("2011-04-09"));
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId3));
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart());
+ assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd());
+ assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
+ }
+
+ @Test
+ public void testSLAEvents() throws Exception {
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
+ slaRegBean.setExpectedDuration(2 * 3600 * 1000);
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
+ // hour
+ String jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(jobId, slaRegBean);
+ assertEquals(1, slaCalcMemory.size());
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+ slaCalcMemory.updateSlaStatus(jobId);
+ assertEquals(2, ehs.getEventQueue().size());
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // both start miss and end miss (101)
+ assertEquals(5, slaSummary.getEventProcessed());
+ assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
+ sdf.parse("2012-01-01"), null);
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+ sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
+
+ assertEquals(3, ehs.getEventQueue().size());
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // All events processed and actual times stored (1000)
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
+ assertEquals(SLAEvent.EventStatus.DURATION_MISS, slaSummary.getEventStatus());
+ assertEquals(sdf.parse("2012-01-01").getTime(), slaSummary.getActualStart().getTime());
+ assertEquals(sdf.parse("2012-01-02").getTime(), slaSummary.getActualEnd().getTime());
+ assertEquals(sdf.parse("2012-01-02").getTime() - sdf.parse("2012-01-01").getTime(),
+ slaSummary.getActualDuration());
+ assertEquals(0, slaCalcMemory.size());
+ }
+
+ @Test
+ public void testDuplicateStartMiss() throws Exception {
+ // test start-miss
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1
+ // hour
+ // back
+ slaRegBean.setExpectedStart(startTime);
+ slaRegBean.setExpectedDuration(3600 * 1000);
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1
+ // hour
+ // ahead
+ String jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+ slaCalcMemory.updateSlaStatus(jobId);
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ assertEquals(1, slaSummary.getEventProcessed());
+ assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
+ new Date(System.currentTimeMillis()), null);
+ slaCalcMemory.updateSlaStatus(jobId);
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ assertEquals(1, slaSummary.getEventProcessed());
+ assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
+ assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus());
+ assertEquals(1, ehs.getEventQueue().size());
+ }
+
+ @Test
+ public void testDuplicateEndMiss() throws Exception {
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead
+ slaRegBean.setExpectedStart(startTime);
+ slaRegBean.setExpectedDuration(3600 * 1000);
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
+ // hour
+ // back
+ String jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+ slaCalcMemory.updateSlaStatus(jobId);
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // Only end sla should be processed (100)
+ assertEquals(4, slaSummary.getEventProcessed());
+ slaCalcMemory.updateSlaStatus(jobId);
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ assertEquals(4, slaSummary.getEventProcessed());
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+ new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // Only Duration sla should be processed as end is already processed
+ // (110)
+ assertEquals(6, slaSummary.getEventProcessed());
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ // Recieve start event
+ assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
+ new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)));
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // Start event received so all bits should be processed (111)
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ assertEquals(0, slaCalcMemory.size());
+ assertEquals(3, ehs.getEventQueue().size());
+
+ }
+
+ public void testSLAHistorySet() throws Exception {
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
+ slaRegBean.setExpectedStart(startTime); // 1 hour back
+ slaRegBean.setExpectedDuration(1000);
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+ String jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+ slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
+ System.currentTimeMillis() - 3600 * 1000), null);
+ slaCalcMemory.updateSlaStatus(jobId);
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // The actual end times are not stored, but sla's processed so (111)
+ assertEquals(7, slaSummary.getEventProcessed());
+ // Moved from map to history set
+ assertEquals(0, slaCalcMemory.size());
+ // Add terminal state event so actual end time is stored
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(
+ System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis()));
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // The actual times are stored, so event processed(1000)
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(3, ehs.getEventQueue().size());
+
+ }
+
+ private SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
+ SLARegistrationBean bean = new SLARegistrationBean();
+ bean.setJobId(jobId);
+ bean.setAppType(appType);
+ return bean;
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Mon Jun 10 04:37:20 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
@@ -36,11 +37,15 @@ import org.apache.oozie.command.coord.Co
import org.apache.oozie.command.coord.CoordKillXCommand;
import org.apache.oozie.command.coord.CoordSubmitXCommand;
import org.apache.oozie.command.wf.KillXCommand;
+import org.apache.oozie.command.wf.ReRunXCommand;
import org.apache.oozie.command.wf.StartXCommand;
import org.apache.oozie.command.wf.SubmitXCommand;
import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -121,6 +126,162 @@ public class TestSLAEventGeneration exte
_testWorkflowJobCommands(conf, ehs, slas, true);
}
+ /**
+ * Test for SLA Events generated through wf rerun
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWorkflowJobSLARerun() throws Exception {
+ EventHandlerService ehs = services.get(EventHandlerService.class);
+ SLAService slas = services.get(SLAService.class);
+
+ String wfXml = IOUtils.getResourceAsString("wf-job-sla.xml", -1);
+ Path appPath = getFsTestCaseDir();
+ writeToFile(wfXml, appPath, "workflow.xml");
+ Configuration conf = new XConfiguration();
+ conf.set(OozieClient.APP_PATH, appPath.toString());
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ cal.setTime(new Date());
+ cal.add(Calendar.MINUTE, -40); // for start_miss
+ Date nominal = cal.getTime();
+ String nominalTime = DateUtils.formatDateOozieTZ(nominal);
+ conf.set("nominal_time", nominalTime);
+ cal.setTime(nominal);
+ cal.add(Calendar.MINUTE, 10); // as per the sla xml
+ String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
+ cal.setTime(nominal);
+ cal.add(Calendar.MINUTE, 30); // as per the sla xml
+ String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
+
+ // Call SubmitX
+ SubmitXCommand sc = new SubmitXCommand(conf, "UNIT_TESTING");
+ String jobId = sc.call();
+ SLACalcStatus slaEvent = slas.getSLACalculator().get(jobId);
+ assertEquals(jobId, slaEvent.getId());
+ assertEquals("test-wf-job-sla", slaEvent.getAppName());
+ assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType());
+ assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
+ assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
+ assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
+
+ slas.runSLAWorker();
+ slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+ assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus());
+ assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus());
+ slas.getSLACalculator().clear();
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ WorkflowJobBean wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+ // set job status to succeeded, so rerun doesn't fail
+ wfBean.setStatus(WorkflowJob.Status.SUCCEEDED);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
+
+ // change conf for rerun
+ cal.setTime(new Date());
+ cal.add(Calendar.MINUTE, -20); // for start_miss
+ nominalTime = DateUtils.formatDateOozieTZ(cal.getTime());
+
+ conf.set("nominal_time", nominalTime);
+ nominal = cal.getTime();
+ cal.add(Calendar.MINUTE, 10); // as per the sla xml
+ expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
+ cal.setTime(nominal);
+ cal.add(Calendar.MINUTE, 30); // as per the sla xml
+ expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
+
+ ReRunXCommand rerun = new ReRunXCommand(jobId, conf, "UNIT_TESTING");
+ rerun.call();
+ slaEvent = slas.getSLACalculator().get(jobId);
+ // assert for new conf
+ assertNotNull(slaEvent);
+ assertEquals(jobId, slaEvent.getId());
+ assertEquals("test-wf-job-sla", slaEvent.getAppName());
+ assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType());
+
+ // assert for new conf
+ assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
+ assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
+ assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
+
+ // assert for values in summary bean to be reset
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ assertEquals( 0, slaSummary.getEventProcessed());
+ assertEquals(-1, slaSummary.getActualDuration());
+ assertNull(slaSummary.getActualStart());
+ assertNull(slaSummary.getActualEnd());
+ assertNull(slaSummary.getJobStatus());
+ assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+ assertNull(slaEvent.getEventStatus());
+
+ ehs.getEventQueue().clear();
+ slas.runSLAWorker();
+ slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+ assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus());
+ assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus());
+
+ }
+
+ /**
+ * Test for SLA Events generated through wf action rerun
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWorkflowActionSLARerun() throws Exception {
+ SLAService slas = services.get(SLAService.class);
+ String wfXml = IOUtils.getResourceAsString("wf-action-sla.xml", -1);
+ Path appPath = getFsTestCaseDir();
+ writeToFile(wfXml, appPath, "workflow.xml");
+ Configuration conf = new XConfiguration();
+ conf.set(OozieClient.APP_PATH, appPath.toString());
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ cal.setTime(new Date());
+ cal.add(Calendar.MINUTE, -20); // for start_miss
+ Date nominal = cal.getTime();
+ String nominalTime = DateUtils.formatDateOozieTZ(nominal);
+ conf.set("nominal_time", nominalTime);
+
+ // Call SubmitX
+ SubmitXCommand sc = new SubmitXCommand(conf, "UNIT_TESTING");
+ String jobId = sc.call();
+ String actionId = jobId+"@grouper";
+
+ slas.getSLACalculator().clear();
+ JPAService jpaService = Services.get().get(JPAService.class);
+ WorkflowJobBean wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+ // set job status to succeeded, so rerun doesn't fail
+ wfBean.setStatus(WorkflowJob.Status.SUCCEEDED);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
+
+ // change conf for rerun
+ cal.setTime(new Date());
+ nominalTime = DateUtils.formatDateOozieTZ(cal.getTime());
+ conf.set("nominal_time", nominalTime);
+ nominal = cal.getTime();
+ cal.add(Calendar.MINUTE, 10); // as per the sla xml
+ String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
+ cal.setTime(nominal);
+ cal.add(Calendar.MINUTE, 30); // as per the sla xml
+ String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
+
+ ReRunXCommand rerun = new ReRunXCommand(jobId, conf, "UNIT_TESTING");
+ rerun.call();
+ SLACalcStatus slaEvent = slas.getSLACalculator().get(actionId);
+ assertNotNull(slaEvent);
+ // assert for action configs
+ assertEquals(actionId, slaEvent.getId());
+ assertEquals("test-wf-action-sla", slaEvent.getAppName());
+ assertEquals(AppType.WORKFLOW_ACTION, slaEvent.getAppType());
+ // assert for new conf
+ assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
+ assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
+ assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
+
+ }
+
@Test
public void testSLASchema1BackwardCompatibility() throws Exception {
EventHandlerService ehs = services.get(EventHandlerService.class);
@@ -192,6 +353,7 @@ public class TestSLAEventGeneration exte
ehs.getEventQueue().poll(); //ignore the wf-action event generated
ehs.new EventWorker().run();
Thread.sleep(300); // time for listeners to run
+ ehs.getEventQueue().poll(); // ignore duration event
slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
assertEquals(jobId, slaEvent.getId());
assertNotNull(slaEvent.getActualEnd());
@@ -278,6 +440,7 @@ public class TestSLAEventGeneration exte
// test that sla processes the Job Event from Kill command
new CoordKillXCommand(jobId).call();
ehs.new EventWorker().run();
+ ehs.getEventQueue().poll(); //ignore duration event
slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
assertEquals(actionId, slaEvent.getId());
assertNotNull(slaEvent.getActualEnd());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java Mon Jun 10 04:37:20 2013
@@ -91,8 +91,8 @@ public class TestSLAJobEventListener ext
SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1");
// check that start sla has been calculated
assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
- assertEquals(0, serviceObj.getSlaProcessed()); //Job switching to running is only partially
- //sla processed. so state = zero
+ assertEquals(1, serviceObj.getEventProcessed()); //Job switching to running is only partially
+ //sla processed. so state = 1
job = _createSLARegBean("wa1", AppType.WORKFLOW_ACTION);
slas.addRegistrationEvent(job);
@@ -114,24 +114,27 @@ public class TestSLAJobEventListener ext
"coord-app-name1", actualStart, actualEnd);
listener.onCoordinatorJobEvent(cje);
- // Since serviceObj is removed from memory after END stage
SLASummaryBean summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("cj1"));
- // check that end sla has been calculated
- assertEquals(2, summary.getSlaProcessed()); //Job in terminal state has finished
- //sla processing. so state = 2
+ // check that end and duration sla has been calculated
+ assertEquals(6, summary.getEventProcessed());
+
assertEquals(EventStatus.END_MET, summary.getEventStatus());
job = _createSLARegBean("ca1", AppType.COORDINATOR_ACTION);
actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z");
slas.addRegistrationEvent(job);
- assertEquals(3, slas.getSLACalculator().size());
- CoordinatorActionEvent cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
+ assertEquals(4, slas.getSLACalculator().size());
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.RUNNING, "user1",
+ "coord-app-name1", null, actualEnd, null);
+ listener.onCoordinatorActionEvent(cae);
+ cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
"coord-app-name1", null, actualEnd, null);
listener.onCoordinatorActionEvent(cae);
summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("ca1"));
- // check that start sla has been calculated
- assertEquals(2, summary.getSlaProcessed());
+ // check that all events are processed
+ assertEquals(8, summary.getEventProcessed());
assertEquals(EventStatus.END_MISS, summary.getEventStatus());
+ assertEquals(3, slas.getSLACalculator().size());
}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestSLARegistrationGetRecordsOnRestartJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testSLARegistrationGetRecordsOnRestart() throws Exception {
+ Date current = new Date();
+ final String jobId = "0000000-" + current.getTime() + "-TestSLARegGetRestartJPAExecutor-W";
+ SLARegistrationBean reg = new SLARegistrationBean();
+ reg.setJobId(jobId);
+ reg.setNotificationMsg("dummyMessage");
+ reg.setUpstreamApps("upApps");
+ reg.setAlertEvents("miss");
+ reg.setAlertContact("abc@y.com");
+ reg.setJobData("jobData");
+ JPAService jpaService = Services.get().get(JPAService.class);
+ List<JsonBean> insert = new ArrayList<JsonBean>();
+ insert.add(reg);
+ SLACalculationInsertUpdateJPAExecutor slaInsertCmd = new SLACalculationInsertUpdateJPAExecutor(insert, null);
+ jpaService.execute(slaInsertCmd);
+ assertNotNull(jpaService);
+ SLARegistrationGetOnRestartJPAExecutor readCmd = new SLARegistrationGetOnRestartJPAExecutor(jobId);
+ SLARegistrationBean bean = jpaService.execute(readCmd);
+ assertEquals("dummyMessage", bean.getNotificationMsg());
+ assertEquals ("upApps", bean.getUpstreamApps());
+ assertEquals ("miss", bean.getAlertEvents());
+ assertEquals ("abc@y.com", bean.getAlertContact());
+ assertEquals ("jobData", bean.getJobData());
+ }
+
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java?rev=1491333&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.AppType;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestSLASummaryGetOnRestartJPAExecutor extends XDataTestCase {
+ Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testSLARegistrationGet() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ SLASummaryBean sla1 = new SLASummaryBean();
+ sla1.setJobId("jobId");
+ sla1.setAppName("appName");
+ sla1.setUser("user");
+ sla1.setParentId("parent");
+ sla1.setEventProcessed(7);
+ // set to 5 days back from now
+ sla1.setLastModifiedTime(new Date(System.currentTimeMillis() - 5*24*60*60*1000));
+
+ SLASummaryBean sla2 = new SLASummaryBean();
+ sla2.setJobId("jobId2");
+ sla2.setEventProcessed(6);
+ // set to long time back
+ sla2.setLastModifiedTime(sdf.parse("2009-06-03"));
+
+ List<JsonBean> insert = new ArrayList<JsonBean>();
+ insert.add(sla1);
+ insert.add(sla2);
+ SLACalculationInsertUpdateJPAExecutor slaInsertCmd = new SLACalculationInsertUpdateJPAExecutor(insert, null);
+ jpaService.execute(slaInsertCmd);
+ // get all records modified in last 7 days
+ SLASummaryGetRecordsOnRestartJPAExecutor slaGetOnRestart = new SLASummaryGetRecordsOnRestartJPAExecutor(7);
+ List<SLASummaryBean> beans = jpaService.execute(slaGetOnRestart);
+ assertEquals(1, beans.size());
+ assertEquals("jobId", beans.get(0).getJobId());
+ assertEquals("appName", beans.get(0).getAppName());
+ assertEquals("user", beans.get(0).getUser());
+ assertEquals("parent", beans.get(0).getParentId());
+ assertEquals(7, beans.get(0).getEventProcessed());
+ }
+
+}
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1491333&r1=1491332&r2=1491333&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Mon Jun 10 04:37:20 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1339 Implement SLA Bootstrap Service and fix bugs in SLACalculator (virag)
OOZIE-1400 REST API to fetch SLA (rohini)
OOZIE-1375 Generate Job notification events for Workflow Actions (mona)
OOZIE-1357 Can't view more than 1000 actions of a coordinator and paging does not work (ryota)