You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/21 04:05:57 UTC
svn commit: r1495272 [2/2] - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org...
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java Fri Jun 21 02:05:56 2013
@@ -24,11 +24,20 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
@@ -159,8 +168,208 @@ public class TestSLACalculatorMemory ext
assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
}
+
+ @Test
+ public void testWorkflowJobSLAStatusOnRestart() throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ String jobId1 = slaRegBean1.getId();
+ slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+ slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+ slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+ SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+ calc1.setEventProcessed(1);
+ calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+ calc1.setJobStatus(WorkflowJob.Status.RUNNING.name());
+ calc1.setLastModifiedTime(new Date());
+ SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+ List<JsonBean> list = new ArrayList<JsonBean>();
+ list.add(slaSummaryBean);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+
+ // Simulate a lost success event
+ WorkflowJobBean wjb = new WorkflowJobBean();
+ wjb.setId(jobId1);
+ wjb.setStatus(WorkflowJob.Status.SUCCEEDED);
+ wjb.setStartTime(sdf.parse("2012-02-07"));
+ wjb.setEndTime(sdf.parse("2013-02-07"));
+ WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(wjb);
+ jpaService.execute(wfInsertCmd);
+
+ slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+
+ // As job succeeded, it should not be in memory
+ assertEquals(0, slaCalcMemory.size());
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ assertEquals("job-1", slaSummary.getJobId());
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
+ assertEquals("SUCCEEDED", slaSummary.getJobStatus());
+ assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus());
+ assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+ assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+ assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
+ slaSummary.getActualDuration());
+
+ // Simulate a lost failed event
+ wjb.setStatus(WorkflowJob.Status.FAILED);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+
+ // Reset the summary Bean
+ calc1.setEventProcessed(1);
+ calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+ calc1.setJobStatus(WorkflowJob.Status.RUNNING.name());
+ slaSummaryBean = new SLASummaryBean(calc1);
+
+ list = new ArrayList<JsonBean>();
+ list.add(slaSummaryBean);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+
+ assertEquals(0, slaCalcMemory.size());
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ assertEquals("FAILED", slaSummary.getJobStatus());
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+ assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+ assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
+
+ // Simulate a lost RUNNING event
+ wjb.setStatus(WorkflowJob.Status.RUNNING);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+
+ // Reset the summary Bean
+ calc1.setEventProcessed(0);
+ calc1.setSLAStatus(SLAEvent.SLAStatus.NOT_STARTED);
+ calc1.setJobStatus(null);
+ slaSummaryBean = new SLASummaryBean(calc1);
+
+ list = new ArrayList<JsonBean>();
+ list.add(slaSummaryBean);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+
+ assertEquals(1, slaCalcMemory.size());
+ SLACalcStatus calc = slaCalcMemory.get(jobId1);
+ assertEquals(1, calc.getEventProcessed());
+ assertEquals("RUNNING", calc.getJobStatus());
+ assertEquals(sdf.parse("2012-02-07"), calc.getActualStart());
+ assertNull(calc.getActualEnd());
+ assertEquals(-1, calc.getActualDuration());
+ assertEquals(SLAEvent.SLAStatus.IN_PROCESS, calc.getSLAStatus());
+ }
+
+ @Test
+ public void testWorkflowActionSLAStatusOnRestart() throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_ACTION);
+ String jobId1 = slaRegBean1.getId();
+ slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+ slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+ slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+ SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+ calc1.setEventProcessed(1);
+ calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+ calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
+ calc1.setLastModifiedTime(new Date());
+ SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+ List<JsonBean> list = new ArrayList<JsonBean>();
+ list.add(slaSummaryBean);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ // Simulate a lost success event
+ WorkflowActionBean wab = new WorkflowActionBean();
+ wab.setId(jobId1);
+ wab.setStatus(WorkflowAction.Status.OK);
+ wab.setStartTime(sdf.parse("2012-02-07"));
+ wab.setEndTime(sdf.parse("2013-02-07"));
+ WorkflowActionInsertJPAExecutor wfInsertCmd = new WorkflowActionInsertJPAExecutor(wab);
+ jpaService.execute(wfInsertCmd);
+
+ slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+
+ // As job succeeded, it should not be in memory
+ assertEquals(0, slaCalcMemory.size());
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ assertEquals("job-1", slaSummary.getJobId());
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
+ assertEquals("OK", slaSummary.getJobStatus());
+ assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus());
+ assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+ assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+ assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
+ slaSummary.getActualDuration());
+
+ }
+
+ @Test
+ public void testCoordinatorActionSLAStatusOnRestart() throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.COORDINATOR_ACTION);
+ String jobId1 = slaRegBean1.getId();
+ slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+ slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+ slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+ SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
+ calc1.setEventProcessed(1);
+ calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+ calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
+ calc1.setLastModifiedTime(new Date());
+ SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+ List<JsonBean> list = new ArrayList<JsonBean>();
+ list.add(slaSummaryBean);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ // Simulate a lost failed event
+ CoordinatorActionBean cab = new CoordinatorActionBean();
+ cab.setId(jobId1);
+ cab.setStatus(CoordinatorAction.Status.FAILED);
+ cab.setLastModifiedTime(sdf.parse("2013-02-07"));
+ cab.setExternalId("wf_job");
+ CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(cab);
+ jpaService.execute(caInsertCmd);
+ WorkflowJobBean wjb = new WorkflowJobBean();
+ wjb.setId("wf_job");
+ wjb.setStartTime(sdf.parse("2012-02-07"));
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
+
+ slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+
+ // As job succeeded, it should not be in memory
+ assertEquals(0, slaCalcMemory.size());
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ assertEquals("job-1", slaSummary.getJobId());
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
+ assertEquals("FAILED", slaSummary.getJobStatus());
+ assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
+ assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+ assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
+ assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
+ slaSummary.getActualDuration());
+ }
+
@Test
- public void testSLAEvents() throws Exception {
+ public void testSLAEvents1() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
slaCalcMemory.init(new Configuration(false));
@@ -174,6 +383,7 @@ public class TestSLACalculatorMemory ext
assertEquals(1, slaCalcMemory.size());
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+ assertEquals("PREP", slaSummary.getJobStatus());
slaCalcMemory.updateSlaStatus(jobId);
assertEquals(2, ehs.getEventQueue().size());
slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
@@ -187,6 +397,11 @@ public class TestSLACalculatorMemory ext
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
sdf.parse("2012-01-01"), null);
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND,
+ sdf.parse("2012-01-01"), null);
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus());
+ assertEquals(5, slaSummary.getEventProcessed());
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
@@ -205,6 +420,67 @@ public class TestSLACalculatorMemory ext
}
@Test
+ public void testSLAEvents2() throws Exception {
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ slaCalcMemory.init(new Configuration(false));
+ SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
+
+ String jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(jobId, slaRegBean);
+ assertEquals(1, slaCalcMemory.size());
+ slaCalcMemory.updateSlaStatus(jobId);
+ SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // Duration bit should be processed as expected duration is not set
+ assertEquals(3, slaSummary.getEventProcessed());
+ // check only start event in queue
+ assertEquals(1, ehs.getEventQueue().size());
+ ehs.getEventQueue().clear();
+
+ // set back to 1, to make duration event not processed
+ slaSummary.setEventProcessed(1);
+ List<JsonBean> list = new ArrayList<JsonBean>();
+ list.add(slaSummary);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+ sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // all should be processed
+ assertEquals(8, slaSummary.getEventProcessed());
+ // check only end event is in queue
+ assertEquals(1, ehs.getEventQueue().size());
+ ehs.getEventQueue().clear();
+
+ slaSummary.setEventProcessed(1);
+ list = new ArrayList<JsonBean>();
+ list.add(slaSummary);
+ jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list));
+
+ slaRegBean = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+ slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
+
+ jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(jobId, slaRegBean);
+ assertEquals(1, slaCalcMemory.size());
+
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
+ sdf.parse("2012-01-02"));
+ slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ // Actual start null, so all events processed
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals(1, ehs.getEventQueue().size());
+ assertNull(slaSummary.getActualStart());
+ assertEquals(sdf.parse("2012-01-02"), slaSummary.getActualEnd());
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
+ }
+
+ @Test
public void testDuplicateStartMiss() throws Exception {
// test start-miss
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Fri Jun 21 02:05:56 2013
@@ -216,7 +216,7 @@ public class TestSLAEventGeneration exte
assertEquals(-1, slaSummary.getActualDuration());
assertNull(slaSummary.getActualStart());
assertNull(slaSummary.getActualEnd());
- assertNull(slaSummary.getJobStatus());
+ assertEquals("PREP", slaSummary.getJobStatus());
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
assertNull(slaEvent.getEventStatus());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java Fri Jun 21 02:05:56 2013
@@ -128,19 +128,20 @@ public class TestSLAService extends XDat
assertTrue(output.toString().contains(sla2.getId() + " Sla START - MISS!!!"));
assertTrue(output.toString().contains(sla2.getId() + " Sla END - MISS!!!"));
output.setLength(0);
-
+ // As expected duration is not set, duration shall be processed and job removed from map
+ assertEquals(2, slas.getSLACalculator().size());
// test same job multiple events (start-met, end-met) through job status event
sla1 = _createSLARegistration("action-1", AppType.COORDINATOR_ACTION);
sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead
sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead
slas.addRegistrationEvent(sla1);
- assertEquals(4, slas.getSLACalculator().size());
+ assertEquals(3, slas.getSLACalculator().size());
slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
new Date());
slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS,
new Date(), new Date());
slas.runSLAWorker();
- assertEquals(3, ehs.getEventQueue().size());
+ assertEquals(2, ehs.getEventQueue().size());
ehs.new EventWorker().run();
assertTrue(output.toString().contains(sla1.getId() + " Sla START - MET!!!"));
assertTrue(output.toString().contains(sla1.getId() + " Sla END - MET!!!"));
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Jun 21 02:05:56 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1424 Improve SLA reliability on restart, fix bugs related to SLA and event generation (virag)
OOZIE-1417 Exlude **/oozie/store/* **/oozie/examples/* from clover reports (dennisyv via virag)
OOZIE-1426 Fix bugs in SLA UI (rohini)
OOZIE-1421 UI for SLA (virag, rohini)