You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/05/17 22:08:45 UTC
[1/3] oozie git commit: OOZIE-2509 SLA job status can stuck in
running state
Repository: oozie
Updated Branches:
refs/heads/master 5fbd3eb3f -> ba7a7b85e
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index 432efef..559e2b3 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -108,11 +108,11 @@ public class TestSLACalculatorMemory extends XDataTestCase {
public void testLoadOnRestart() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
- SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
String jobId1 = slaRegBean1.getId();
- SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+ SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2-W", AppType.WORKFLOW_JOB);
String jobId2 = slaRegBean2.getId();
- SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB);
+ SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3-W", AppType.WORKFLOW_JOB);
String jobId3 = slaRegBean3.getId();
List<String> idList = new ArrayList<String>();
idList.add(slaRegBean1.getId());
@@ -134,6 +134,12 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaRegBean1.setAlertEvents("MISS");
slaRegBean1.setJobData("jobData");
+ Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
+ Date endTime = new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000); // 1 hour back
+
+ slaRegBean3.setExpectedStart(startTime);
+ slaRegBean3.setExpectedEnd(endTime);
+
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
slaCalcMemory.addRegistration(jobId2, slaRegBean2);
slaCalcMemory.addRegistration(jobId3, slaRegBean3);
@@ -142,9 +148,6 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SLACalcStatus calc2 = slaCalcMemory.get(jobId2);
SLACalcStatus calc3 = slaCalcMemory.get(jobId3);
- calc1.setEventProcessed(5);
- calc2.setEventProcessed(6);
- calc3.setEventProcessed(7);
calc1.setEventStatus(SLAEvent.EventStatus.END_MISS);
calc1.setSLAStatus(SLAEvent.SLAStatus.MISS);
@@ -154,11 +157,30 @@ public class TestSLACalculatorMemory extends XDataTestCase {
calc1.setLastModifiedTime(lastModifiedTime);
List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
- SLASummaryBean bean = new SLASummaryBean(calc1);
- bean.setActualStart(sdf.parse("2011-03-09"));
- bean.setActualEnd(sdf.parse("2011-03-10"));
- bean.setActualDuration(456);
- updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, bean));
+ WorkflowJobBean wf1 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId1);
+ wf1.setId(jobId1);
+ wf1.setStatus(WorkflowJob.Status.SUCCEEDED);
+ wf1.setStartTime(sdf.parse("2011-03-09"));
+ wf1.setEndTime(sdf.parse("2011-03-10"));
+ wf1.setLastModifiedTime(new Date());
+
+ WorkflowJobBean wf2 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId2);
+ wf2.setId(jobId2);
+ wf2.setStatus(WorkflowJob.Status.RUNNING);
+ wf2.setStartTime(sdf.parse("2011-03-09"));
+ wf2.setEndTime(null);
+ wf2.setLastModifiedTime(new Date());
+
+ WorkflowJobBean wf3 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3);
+ wf3.setId(jobId3);
+ wf3.setStatus(WorkflowJob.Status.RUNNING);
+ wf3.setStartTime(startTime);
+ wf3.setEndTime(null);
+ wf3.setLastModifiedTime(new Date());
+
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf1));
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf2));
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf3));
updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
new SLASummaryBean(calc2)));
updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
@@ -169,10 +191,11 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
- assertEquals(2, slaCalcMemory.size());
- SLACalcStatus calc = slaCalcMemory.get(jobId1);
- assertEquals("job-1", calc.getId());
+ SLACalcStatus calc = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(
+ SLASummaryQuery.GET_SLA_SUMMARY, jobId1), SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ON_RESTART, jobId1));
+ assertEquals("job-1-W", calc.getId());
assertEquals(AppType.WORKFLOW_JOB, calc.getAppType());
assertEquals("app-name", calc.getAppName());
assertEquals(123, calc.getExpectedDuration());
@@ -188,22 +211,35 @@ public class TestSLACalculatorMemory extends XDataTestCase {
assertEquals("jobData", calc.getJobData());
assertEquals(sdf.parse("2011-03-09"), calc.getActualStart());
assertEquals(sdf.parse("2011-03-10"), calc.getActualEnd());
- assertEquals(456, calc.getActualDuration());
assertEquals(SLAEvent.EventStatus.END_MISS, calc1.getEventStatus());
assertEquals(SLAEvent.SLAStatus.MISS, calc1.getSLAStatus());
assertEquals(WorkflowJob.Status.FAILED.toString(), calc1.getJobStatus());
assertEquals(lastModifiedTime, calc1.getLastModifiedTime());
- assertEquals(5, calc.getEventProcessed());
- assertEquals(6, slaCalcMemory.get(jobId2).getEventProcessed());
- // jobId3 should be in history set as eventprocessed is 7 (111)
- assertEquals(2, slaCalcMemory.size()); // 2 out of 3 jobs in map
+ calc2 = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(
+ SLASummaryQuery.GET_SLA_SUMMARY, jobId2), SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ON_RESTART, jobId2));
+
+
+ assertEquals(8, calc.getEventProcessed());
+ assertEquals(7, calc2.getEventProcessed());
+ // jobId2 should be in history set as eventprocessed is 7 (111)
+ //job3 will be in slamap
+ assertEquals(1, slaCalcMemory.size()); // 1 out of 3 jobs in map
+ WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3);
+ wf.setId(jobId3);
+ wf.setStatus(WorkflowJob.Status.SUCCEEDED);
+ wf.setEndTime(endTime);
+ wf.setStartTime(startTime);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wf);
+
slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
- sdf.parse("2011-03-09"), sdf.parse("2011-04-09"));
+ startTime, endTime);
+
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId3);
assertEquals(8, slaSummary.getEventProcessed());
- assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart());
- assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd());
+ assertEquals(startTime, slaSummary.getActualStart());
+ assertEquals(endTime, slaSummary.getActualEnd());
assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
}
@@ -213,7 +249,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
- SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
@@ -240,11 +276,8 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
- // As job succeeded, it should not be in memory
- assertEquals(0, slaCalcMemory.size());
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
- assertEquals("job-1", slaSummary.getId());
+ assertEquals("job-1-W", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
assertEquals("SUCCEEDED", slaSummary.getJobStatus());
@@ -270,7 +303,6 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
- assertEquals(0, slaCalcMemory.size());
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("FAILED", slaSummary.getJobStatus());
assertEquals(8, slaSummary.getEventProcessed());
@@ -293,15 +325,14 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
- assertEquals(1, slaCalcMemory.size());
- SLACalcStatus calc = slaCalcMemory.get(jobId1);
- assertEquals(1, calc.getEventProcessed());
- assertEquals("RUNNING", calc.getJobStatus());
- assertEquals(sdf.parse("2012-02-07"), calc.getActualStart());
- assertNull(calc.getActualEnd());
- assertEquals(-1, calc.getActualDuration());
- assertEquals(SLAEvent.SLAStatus.IN_PROCESS, calc.getSLAStatus());
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean.getId());
+ //since job is already running and it's a old job
+ assertEquals(7, slaSummary.getEventProcessed());
+ assertEquals("RUNNING", slaSummary.getJobStatus());
+ assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
+ assertNull(slaSummary.getActualEnd());
+ assertEquals(-1, slaSummary.getActualDuration());
+ assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
}
@Test
@@ -309,7 +340,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
- SLARegistrationBean slaRegBean1 = _createSLARegistration("job@1", AppType.WORKFLOW_ACTION);
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-W@1", AppType.WORKFLOW_ACTION);
String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
@@ -334,11 +365,10 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
- // As job succeeded, it should not be in memory
+ slaCalcMemory.updateAllSlaStatus();
assertEquals(0, slaCalcMemory.size());
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
- assertEquals("job@1", slaSummary.getId());
+ assertEquals("job-W@1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
assertEquals("OK", slaSummary.getJobStatus());
@@ -355,7 +385,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
- SLARegistrationBean slaRegBean1 = _createSLARegistration("job@1", AppType.COORDINATOR_ACTION);
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION);
String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
@@ -374,23 +404,23 @@ public class TestSLACalculatorMemory extends XDataTestCase {
cab.setId(jobId1);
cab.setStatus(CoordinatorAction.Status.FAILED);
cab.setLastModifiedTime(sdf.parse("2013-02-07"));
- cab.setExternalId("wf_job");
+ cab.setExternalId("wf_job-W");
CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(cab);
jpaService.execute(caInsertCmd);
WorkflowJobBean wjb = new WorkflowJobBean();
- wjb.setId("wf_job");
+ wjb.setId("wf_job-W");
wjb.setStartTime(sdf.parse("2012-02-07"));
wjb.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().insert(wjb);
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
-
+ slaCalcMemory.updateAllSlaStatus();
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
- assertEquals("job@1", slaSummary.getId());
+ assertEquals("job-C@1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
assertEquals("FAILED", slaSummary.getJobStatus());
@@ -402,6 +432,68 @@ public class TestSLACalculatorMemory extends XDataTestCase {
}
@Test
+ public void testEventMissOnRestart() throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ CoordinatorActionBean coordAction = new CoordinatorActionBean();
+ coordAction.setId("coordActionId-C@1");
+ coordAction.setStatus(CoordinatorAction.Status.RUNNING);
+ coordAction.setLastModifiedTime(sdf.parse("2013-02-07"));
+ CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction);
+ jpaService.execute(caInsertCmd);
+
+ CoordinatorActionBean coordAction2 = new CoordinatorActionBean();
+ coordAction2.setId("coordActionId-C@2");
+ coordAction2.setStatus(CoordinatorAction.Status.RUNNING);
+ coordAction2.setLastModifiedTime(sdf.parse("2013-02-07"));
+ caInsertCmd = new CoordActionInsertJPAExecutor(coordAction2);
+ jpaService.execute(caInsertCmd);
+
+ SLARegistrationBean slaRegBean1 = _createSLARegistration("coordActionId-C@1", AppType.COORDINATOR_ACTION);
+ String jobId1 = slaRegBean1.getId();
+ slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
+ slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
+ slaRegBean1.setExpectedDuration(100000); // long duration;
+ slaCalcMemory.addRegistration(jobId1, slaRegBean1);
+ slaCalcMemory.updateAllSlaStatus();
+
+ SLARegistrationBean slaRegBean2 = _createSLARegistration("coordActionId-C@2", AppType.COORDINATOR_ACTION);
+ String jobId2 = slaRegBean2.getId();
+ slaRegBean2.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour
+ slaRegBean2.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000)); // 2 hour
+ slaRegBean2.setExpectedDuration(100000); // long duration;
+ slaCalcMemory.addRegistration(jobId2, slaRegBean2);
+ slaCalcMemory.updateAllSlaStatus();
+ assertEquals(2, slaCalcMemory.size());
+
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
+ SLASummaryBean slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2);
+
+ assertEquals("coordActionId-C@1", slaSummary.getId());
+ assertEquals(5, slaSummary.getEventProcessed());
+ assertEquals(-1, slaSummary.getActualDuration());
+
+ assertEquals("coordActionId-C@2", slaSummary2.getId());
+ assertEquals(0, slaSummary2.getEventProcessed());
+ assertEquals(-1, slaSummary2.getActualDuration());
+
+ coordAction.setStatusStr("FAILED");
+ coordAction2.setStatusStr("FAILED");
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction);
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction2);
+
+ slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
+ slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2);
+
+ assertEquals("coordActionId-C@1", slaSummary.getId());
+ assertEquals(8, slaSummary.getEventProcessed());
+ assertEquals("coordActionId-C@2", slaSummary2.getId());
+ assertEquals(8, slaSummary2.getEventProcessed());
+
+ }
+ @Test
public void testSLAEvents1() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
@@ -428,13 +520,24 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
- slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
- sdf.parse("2012-01-01"), null);
+
+ job1.setStatusStr(WorkflowJob.Status.SUSPENDED.toString());
+ job1.setLastModifiedTime(new Date());
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND,
sdf.parse("2012-01-01"), null);
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus());
+
assertEquals(5, slaSummary.getEventProcessed());
+ job1.setStatusStr(WorkflowJob.Status.SUCCEEDED.toString());
+ job1.setLastModifiedTime(new Date());
+ job1.setStartTime(sdf.parse("2012-01-01"));
+ job1.setEndTime(sdf.parse("2012-01-02"));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
@@ -442,7 +545,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// All events processed and actual times stored (1000)
assertEquals(8, slaSummary.getEventProcessed());
- assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ assertEquals(SLAStatus.MET, slaSummary.getSLAStatus());
assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
assertEquals(SLAEvent.EventStatus.DURATION_MISS, slaSummary.getEventStatus());
assertEquals(sdf.parse("2012-01-01").getTime(), slaSummary.getActualStart().getTime());
@@ -479,6 +582,12 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ job1.setStatus(WorkflowJob.Status.SUCCEEDED);
+ job1.setStartTime(sdf.parse("2012-01-01"));
+ job1.setEndTime(sdf.parse("2012-01-02"));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
@@ -490,16 +599,21 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaSummary.setEventProcessed(1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary);
+ WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
- slaRegBean = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+ slaRegBean = _createSLARegistration(job2.getId(), AppType.WORKFLOW_JOB);
slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
+ job2.setStatus(WorkflowJob.Status.KILLED);
+ job2.setEndTime(sdf.parse("2012-01-02"));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job2);
- slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
+ slaCalcMemory.addJobStatus(job2.getId(), WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
sdf.parse("2012-01-02"));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Actual start null, so all events processed
@@ -521,7 +635,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
slaRegBean.setExpectedStart(startTime);
- slaRegBean.setExpectedDuration(3600 * 1000);
+ slaRegBean.setExpectedDuration(2* 3600 * 1000); //to avoid duration miss
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour ahead
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
@@ -530,11 +644,15 @@ public class TestSLACalculatorMemory extends XDataTestCase {
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
+ job1.setStatus(WorkflowJob.Status.RUNNING);
+ job1.setStartTime(startTime);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
new Date(System.currentTimeMillis()), null);
- slaCalcMemory.updateJobSla(jobId);
- slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus());
@@ -565,13 +683,21 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(4, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+
+ job1.setId(job1.getId());
+ job1.setStatus(WorkflowJob.Status.SUCCEEDED);
+ job1.setStartTime(new Date(System.currentTimeMillis()));
+ job1.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Only Duration sla should be processed as end is already processed
// (110)
- assertEquals(6, slaSummary.getEventProcessed());
+ assertEquals(8, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
// Recieve start event
assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
@@ -598,15 +724,32 @@ public class TestSLACalculatorMemory extends XDataTestCase {
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
+
+ job1.setId(job1.getId());
+ job1.setStatus(WorkflowJob.Status.RUNNING);
+ job1.setStartTime(new Date(System.currentTimeMillis() - 3600 * 1000));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
System.currentTimeMillis() - 3600 * 1000), null);
- slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// The actual end times are not stored, but sla's processed so (111)
assertEquals(7, slaSummary.getEventProcessed());
// Moved from map to history set
assertEquals(0, slaCalcMemory.size());
// Add terminal state event so actual end time is stored
+
+ job1.setId(job1.getId());
+ job1.setStatus(WorkflowJob.Status.SUCCEEDED);
+ job1.setEndTime(new Date(System.currentTimeMillis() - 3600 * 1000));
+ job1.setStartTime(new Date(System.currentTimeMillis()));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(
System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis()));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
@@ -634,16 +777,21 @@ public class TestSLACalculatorMemory extends XDataTestCase {
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
+ job1.setStatusStr("RUNNING");
+ job1.setLastModifiedTime(new Date());
+ job1.setStartTime(startTime);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
+
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
System.currentTimeMillis() - 3600 * 1000), null);
- slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// The actual end times are not stored, but sla's processed so (111)
assertEquals(7, slaSummary.getEventProcessed());
assertTrue(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
job1.setStatusStr("SUCCEEDED");
job1.setLastModifiedTime(new Date());
- WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1);
+ job1.setStartTime(startTime);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.new HistoryPurgeWorker().run();
assertFalse(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
}
@@ -742,5 +890,85 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalculator.addRegistration(slaRegBean.getId(), slaRegBean);
return action.getId();
}
+ @Test
+ public void testEventOutOfOrder() throws Exception {
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ SLARegistrationBean slaRegBean = _createSLARegistration(wfJob.getId(), AppType.WORKFLOW_JOB);
+ Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead
+ slaRegBean.setExpectedStart(startTime);
+ slaRegBean.setExpectedDuration(3600 * 1000);
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour back
+ String jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+ slaCalcMemory.updateJobSla(jobId);
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ slaRegBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
+ assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.RUNNING.toString());
+
+ wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+ wfJob.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob);
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+ new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
+
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.SUCCESS,
+ new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
+
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
+ }
+
+ public void testWFEndNotCoord() throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
+ SLARegistrationBean slaRegBean = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION);
+ String coordActionId = slaRegBean.getId();
+ slaRegBean.setExpectedEnd(sdf.parse("2013-03-07"));
+ slaRegBean.setExpectedStart(sdf.parse("2012-03-07"));
+ slaCalcMemory.addRegistration(coordActionId, slaRegBean);
+ SLACalcStatus calc1 = slaCalcMemory.get(coordActionId);
+ calc1.setEventProcessed(1);
+ calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
+ calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
+ calc1.setLastModifiedTime(new Date());
+ SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
+
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
+
+ // Simulate a lost failed event
+ CoordinatorActionBean coordAction = new CoordinatorActionBean();
+ coordAction.setId(coordActionId);
+ coordAction.setStatus(CoordinatorAction.Status.RUNNING);
+ coordAction.setLastModifiedTime(sdf.parse("2013-02-07"));
+ coordAction.setExternalId("wf_job-W");
+ CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction);
+ jpaService.execute(caInsertCmd);
+ WorkflowJobBean wjb = new WorkflowJobBean();
+ wjb.setId("wf_job-W");
+ wjb.setStartTime(sdf.parse("2012-02-07"));
+ wjb.setLastModifiedTime(new Date());
+ wjb.setStatus(WorkflowJob.Status.SUCCEEDED);
+ WorkflowJobQueryExecutor.getInstance().insert(wjb);
+
+ calc1 = slaCalcMemory.get(coordActionId);
+ slaCalcMemory.updateJobSla(coordActionId);
+ slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId);
+ //cord action is running and wf job is completed
+ assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.RUNNING.name());
+
+ coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction);
+
+ slaCalcMemory.addJobStatus(coordActionId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
+ sdf.parse("2012-02-07"), sdf.parse("2012-03-07"));
+
+ slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId);
+ assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
index 7a710c2..06f54f2 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
@@ -513,6 +513,7 @@ public class TestSLAEventGeneration extends XDataTestCase {
wf.setId(action.getExternalId());
wf.setStatus(WorkflowJob.Status.KILLED);
wf.setParentId(action.getId());
+ wf.setEndTime(new Date());
jpa.execute(new WorkflowJobInsertJPAExecutor(wf));
new CoordActionUpdateXCommand(wf).call();
assertEquals(1, ehs.getEventQueue().size());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
index ebb12f7..7d40e31 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
@@ -18,22 +18,32 @@
package org.apache.oozie.sla;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.event.CoordinatorActionEvent;
-import org.apache.oozie.event.CoordinatorJobEvent;
import org.apache.oozie.event.WorkflowActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.event.listener.JobEventListener;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.listener.SLAJobEventListener;
@@ -81,61 +91,79 @@ public class TestSLAJobEventListener extends XTestCase {
SLAJobEventListener listener = new SLAJobEventListener();
listener.init(services.getConf());
// add dummy registration events to the SLAService map
- SLARegistrationBean job = _createSLARegBean("wf1", AppType.WORKFLOW_JOB);
+ SLARegistrationBean job = _createSLARegBean("wf1-W", AppType.WORKFLOW_JOB);
job.setExpectedStart(DateUtils.parseDateUTC("2012-07-22T00:00Z"));
+ job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-23T00:00Z"));
slas.addRegistrationEvent(job);
assertEquals(1, slas.getSLACalculator().size());
Date actualStart = DateUtils.parseDateUTC("2012-07-22T01:00Z");
- WorkflowJobEvent wfe = new WorkflowJobEvent("wf1", "caId1", WorkflowJob.Status.RUNNING, "user1",
+
+ createWorkflow("wf1-W", actualStart);
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wf1-W", "caId1", WorkflowJob.Status.RUNNING, "user1",
"wf-app-name1", actualStart, null);
listener.onWorkflowJobEvent(wfe);
- SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1");
+ SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1-W");
+
+ // job will be checked against DB.. since it's old job. all event will get evaluted and job will move to history set.
// check that start sla has been calculated
- assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
- assertEquals(1, serviceObj.getEventProcessed()); //Job switching to running is only partially
- //sla processed. so state = 1
+ assertEquals(EventStatus.END_MISS, serviceObj.getEventStatus());
+ assertEquals(7, serviceObj.getEventProcessed()); //Job switching to running is only partially
+ assertEquals(0, slas.getSLACalculator().size());
+
+
+ createWorkflowAction("wfId1-W@wa1", "wf1-W");
+ job = _createSLARegBean("wfId1-W@wa1", AppType.WORKFLOW_ACTION);
+ job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-22T01:00Z"));
- job = _createSLARegBean("wfId1@wa1", AppType.WORKFLOW_ACTION);
slas.addRegistrationEvent(job);
- assertEquals(2, slas.getSLACalculator().size());
+ assertEquals(1, slas.getSLACalculator().size());
job.setExpectedStart(DateUtils.parseDateUTC("2012-07-22T00:00Z"));
- WorkflowActionEvent wae = new WorkflowActionEvent("wfId1@wa1", "wfId1", WorkflowAction.Status.RUNNING, "user1",
+ WorkflowActionEvent wae = new WorkflowActionEvent("wfId1-W@wa1", "wf1-W", WorkflowAction.Status.RUNNING, "user1",
"wf-app-name1", actualStart, null);
listener.onWorkflowActionEvent(wae);
- serviceObj = slas.getSLACalculator().get("wfId1@wa1");
+ serviceObj = slas.getSLACalculator().get("wfId1-W@wa1");
// check that start sla has been calculated
- assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
+ assertEquals(EventStatus.END_MISS, serviceObj.getEventStatus());
+ createCoord("cj1-C");
- job = _createSLARegBean("cj1", AppType.COORDINATOR_JOB);
+ CoordinatorActionBean coordAction= createCoordAction("cj1-C@ca1", "cj1-C");
+ job = _createSLARegBean("cj1-C@ca1", AppType.COORDINATOR_ACTION);
job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-22T01:00Z"));
+ Date actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z");
slas.addRegistrationEvent(job);
- assertEquals(3, slas.getSLACalculator().size());
- Date actualEnd = DateUtils.parseDateUTC("2012-07-22T00:00Z");
- CoordinatorJobEvent cje = new CoordinatorJobEvent("cj1", "bj1", CoordinatorJob.Status.SUCCEEDED, "user1",
- "coord-app-name1", actualStart, actualEnd);
- listener.onCoordinatorJobEvent(cje);
-
- SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1");
- // check that end and duration sla has been calculated
- assertEquals(6, summary.getEventProcessed());
-
- assertEquals(EventStatus.END_MET, summary.getEventStatus());
-
- job = _createSLARegBean("cj1@ca1", AppType.COORDINATOR_ACTION);
- actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z");
- slas.addRegistrationEvent(job);
- assertEquals(4, slas.getSLACalculator().size());
- CoordinatorActionEvent cae = new CoordinatorActionEvent("cj1@ca1", "cj1", CoordinatorAction.Status.RUNNING, "user1",
+ assertEquals(1, slas.getSLACalculator().size());
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("cj1-C@ca1", "cj1-C", CoordinatorAction.Status.RUNNING, "user1",
"coord-app-name1", null, actualEnd, null);
listener.onCoordinatorActionEvent(cae);
- cae = new CoordinatorActionEvent("cj1@ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
+ coordAction.setStatus(CoordinatorAction.Status.KILLED);
+ coordAction.setLastModifiedTime(new Date());
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction);
+
+ cae = new CoordinatorActionEvent("cj1-C@ca1", "cj1-C", CoordinatorAction.Status.KILLED, "user1",
"coord-app-name1", null, actualEnd, null);
listener.onCoordinatorActionEvent(cae);
- summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1@ca1");
+ SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1-C@ca1");
// check that all events are processed
assertEquals(8, summary.getEventProcessed());
assertEquals(EventStatus.END_MISS, summary.getEventStatus());
- assertEquals(3, slas.getSLACalculator().size());
+ //all jobs are processed
+ assertEquals(0, slas.getSLACalculator().size());
+
+ job = _createSLARegBean("wf2-W", AppType.WORKFLOW_JOB);
+ job.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hour before
+ job.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hours after
+ slas.addRegistrationEvent(job);
+ assertEquals(1, slas.getSLACalculator().size());
+
+ createWorkflow("wf2-W", new Date());
+ wfe = new WorkflowJobEvent("wf2-W", "caId2", WorkflowJob.Status.RUNNING, "user1",
+ "wf-app-name1", null, null);
+ listener.onWorkflowJobEvent(wfe);
+ serviceObj = slas.getSLACalculator().get("wf2-W");
+
+ assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus());
+ assertEquals(3, serviceObj.getEventProcessed()); //Only duration and start are processed. Duration = -1
+ assertEquals(1, slas.getSLACalculator().size());
}
@@ -145,4 +173,44 @@ public class TestSLAJobEventListener extends XTestCase {
reg.setAppType(appType);
return reg;
}
+ private WorkflowJobBean createWorkflow(String id, Date actualStart) throws Exception {
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ WorkflowJobBean workflow = new WorkflowJobBean();
+ workflow.setId(id);
+ workflow.setStatusStr("PREP");
+ workflow.setStartTime(actualStart);
+ workflow.setSlaXml("<sla></sla>");
+ insertList.add(workflow);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ return workflow;
+ }
+
+ private WorkflowActionBean createWorkflowAction(String id, String parentId) throws Exception {
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ WorkflowActionBean action = new WorkflowActionBean();
+ action.setId(id);
+ action.setJobId(parentId);
+ insertList.add(action);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ return action;
+ }
+
+ private CoordinatorActionBean createCoordAction(String id, String parentId) throws Exception {
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ CoordinatorActionBean action = new CoordinatorActionBean();
+ action.setId(id);
+ action.setJobId(parentId);
+ insertList.add(action);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ return action;
+ }
+
+ private CoordinatorJobBean createCoord(String id) throws Exception {
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ CoordinatorJobBean job = new CoordinatorJobBean();
+ job.setId(id);
+ insertList.add(job);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ return job;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
index c3bc110..1e19923 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
@@ -25,16 +25,17 @@ import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
@@ -100,7 +101,9 @@ public class TestSLAService extends XDataTestCase {
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
// test start-miss
- SLARegistrationBean sla1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+
+ SLARegistrationBean sla1 = _createSLARegistration(wfJob.getId(), AppType.WORKFLOW_JOB);
sla1.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); //1 hour back
sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); //1 hour back
sla1.setExpectedDuration(10 * 60 * 1000); //10 mins
@@ -113,22 +116,48 @@ public class TestSLAService extends XDataTestCase {
output.setLength(0);
// test different jobs and events start-met and end-miss
- sla1 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+ WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+
+ sla1 = _createSLARegistration(wfJob2.getId(), AppType.WORKFLOW_JOB);
sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead
sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead
slas.addRegistrationEvent(sla1);
+
+ wfJob2.setStatusStr("RUNNING");
+ wfJob2.setLastModifiedTime(new Date());
+ wfJob2.setStartTime(new Date());
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2);
+
slas.addStatusEvent(sla1.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
new Date());
- SLARegistrationBean sla2 = _createSLARegistration("job-3", AppType.COORDINATOR_JOB);
+ CoordinatorActionBean action = addRecordToCoordActionTable("coord_id-C", 1,
+ CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0);
+
+ SLARegistrationBean sla2 = _createSLARegistration(action.getId(), AppType.COORDINATOR_ACTION);
sla2.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead only for testing
sla2.setExpectedEnd(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hours back
sla2.setExpectedDuration(10); //to process duration too
slas.addRegistrationEvent(sla2);
assertEquals(3, slas.getSLACalculator().size());
+
Date startTime = new Date();
- slas.addStatusEvent(sla2.getId(), CoordinatorJob.Status.RUNNING.name(), EventStatus.STARTED, startTime,
- null);
- slas.addStatusEvent(sla2.getId(), CoordinatorJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, startTime,
+ WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ wfJob3.setStatusStr("SUCCEEDED");
+ wfJob3.setLastModifiedTime(new Date());
+ wfJob3.setStartTime(startTime);
+ wfJob3.setEndTime(startTime);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob3);
+
+ action.setCreatedTime(startTime);
+ action.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ action.setLastModifiedTime(new Date());
+ action.setExternalId(wfJob3.getId());
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action);
+ slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+ new Date());
+ slas.addStatusEvent(sla2.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, startTime,
new Date());
slas.runSLAWorker();
ehs.new EventWorker().run();
@@ -139,6 +168,11 @@ public class TestSLAService extends XDataTestCase {
// test same job multiple events (start-miss, end-miss) through regular check
WorkflowJobBean job4 = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
+ job4.setLastModifiedTime(new Date());
+ job4.setEndTime(new Date());
+ job4.setStartTime(new Date());
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job4);
sla2 = _createSLARegistration(job4.getId(), AppType.WORKFLOW_JOB);
sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hours back
sla2.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); //1 hour back
@@ -151,14 +185,13 @@ public class TestSLAService extends XDataTestCase {
output.setLength(0);
// As expected duration is not set, duration shall be processed and job removed from map
assertEquals(2, slas.getSLACalculator().size());
+
// test same job multiple events (start-met, end-met) through job status event
- sla1 = _createSLARegistration("action@1", AppType.COORDINATOR_ACTION);
+ sla1 = _createCoordActionSLARegistration(CoordinatorAction.Status.SUCCEEDED.name());
sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead
sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead
slas.addRegistrationEvent(sla1);
assertEquals(3, slas.getSLACalculator().size());
- slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
- new Date());
slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS,
new Date(), new Date());
slas.runSLAWorker();
@@ -174,39 +207,40 @@ public class TestSLAService extends XDataTestCase {
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
JPAService jpaService = Services.get().get(JPAService.class);
+ Date date = new Date();
// CASE 1: positive test WF job
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
SLARegistrationBean sla = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
- sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1800 * 1000)); // half hour back
+ sla.setExpectedEnd(new Date(date.getTime() - 1 * 1800 * 1000)); // half hour back
slas.addRegistrationEvent(sla);
// CASE 2: negative test WF job
WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
- job2.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000));
- job2.setStartTime(new Date(System.currentTimeMillis() - 1 * 2000 * 1000));
+ job2.setEndTime(new Date(date.getTime() - 1 * 1800 * 1000));
+ job2.setStartTime(new Date(date.getTime() - 1 * 2000 * 1000));
job2.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job2);
sla = _createSLARegistration(job2.getId(), AppType.WORKFLOW_JOB);
- sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1500 * 1000)); // in past but > actual end
+ sla.setExpectedEnd(new Date(date.getTime() - 1 * 1500 * 1000)); // in past but > actual end
sla.setExpectedDuration(100); //unreasonable to cause MISS
slas.addRegistrationEvent(sla);
+ slas.runSLAWorker();
+
// CASE 3: positive test Coord action
- CoordinatorActionBean action1 = addRecordToCoordActionTable("coord-action-1", 1,
+ CoordinatorActionBean action1 = addRecordToCoordActionTable("coord-action-C@1", 1,
CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
- WorkflowJobBean extWf = new WorkflowJobBean();
- extWf.setId(action1.getExternalId());
- extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000));
- extWf.setStartTime(new Date(System.currentTimeMillis() - 1 * 2100 * 1000));
- jpaService.execute(new WorkflowJobInsertJPAExecutor(extWf));
+ action1.setExternalId(null);
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action1);
+
sla = _createSLARegistration(action1.getId(), AppType.COORDINATOR_ACTION);
sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 2000 * 1000)); // past
slas.addRegistrationEvent(sla);
// CASE 4: positive test coord action
- CoordinatorActionBean action2 = addRecordToCoordActionTable("coord-action-2", 1,
+ CoordinatorActionBean action2 = addRecordToCoordActionTable("coord-action-C@2", 1,
CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
- extWf = new WorkflowJobBean();
+ WorkflowJobBean extWf = new WorkflowJobBean();
extWf.setId(action2.getExternalId());
// actual end before expected. but action is failed
extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000));
@@ -217,7 +251,7 @@ public class TestSLAService extends XDataTestCase {
slas.addRegistrationEvent(sla);
// CASE 5: negative test coord action
- CoordinatorActionBean action3 = addRecordToCoordActionTable("coord-action-3", 1,
+ CoordinatorActionBean action3 = addRecordToCoordActionTable("coord-action-C@3", 1,
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
extWf = new WorkflowJobBean();
extWf.setId(action3.getExternalId());
@@ -271,16 +305,25 @@ public class TestSLAService extends XDataTestCase {
assertNull(slas.getSLACalculator().get(action2.getId())); //removed from memory
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId());
- extWf = jpaService.execute(new WorkflowJobGetJPAExecutor(action1.getExternalId()));
- assertEquals(extWf.getStartTime(), slaSummary.getActualStart());
- assertEquals(extWf.getEndTime(), slaSummary.getActualEnd());
- assertEquals(extWf.getEndTime().getTime() - extWf.getStartTime().getTime(), slaSummary.getActualDuration());
+ assertNull(slaSummary.getActualStart());
+ assertNull(slaSummary.getActualEnd());
assertEquals(action1.getStatusStr(), slaSummary.getJobStatus());
assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
- assertEquals(8, slaSummary.getEventProcessed());
- assertNull(slas.getSLACalculator().get(action1.getId())); //removed from memory
+ assertEquals(7, slaSummary.getEventProcessed());
+ assertNotNull(slas.getSLACalculator().get(action1.getId()));
+ //From waiting to TIMEOUT with wf jobid
+ action1.setStatus(CoordinatorAction.Status.TIMEDOUT);
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action1);
+ slas.getSLACalculator().addJobStatus(action1.getId(), null, null, null, null);
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId());
+ assertNull(slaSummary.getActualStart());
+ assertNotNull(slaSummary.getActualEnd());
+ assertEquals("TIMEDOUT", slaSummary.getJobStatus());
+ assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
+ assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
+ assertEquals(8, slaSummary.getEventProcessed());
}
/**
@@ -307,6 +350,28 @@ public class TestSLAService extends XDataTestCase {
return bean;
}
+ public SLARegistrationBean _createCoordActionSLARegistration(String status) throws Exception {
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ wfJob.setLastModifiedTime(new Date());
+ wfJob.setStartTime(new Date());
+ wfJob.setEndTime(new Date());
+ wfJob.setStatusStr(status);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob);
+
+ CoordinatorActionBean action = addRecordToCoordActionTable(new Date().getTime() + "-C", 1,
+ CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0);
+ action.setExternalId(wfJob.getId());
+ action.setStatusStr(status);
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action);
+
+ SLARegistrationBean bean = new SLARegistrationBean();
+ bean.setId(action.getId());
+ bean.setAppType(AppType.COORDINATOR_ACTION);
+ return bean;
+ }
+
+
public static void assertEventNoDuplicates(String outputStr, String eventMsg) {
int index = outputStr.indexOf(eventMsg);
assertTrue(index != -1);
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 412128c..8ea3c4a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2509 SLA job status can stuck in running state (puru)
OOZIE-2529 Support adding secret keys to Credentials of Launcher (satishsaley via rohini)
OOZIE-1402 Increase retry interval for non-progressing coordinator action with fix value (satishsaley via puru)
OOZIE-2512 ShareLibservice returns incorrect path for jar (satishsaley via puru)
[3/3] oozie git commit: OOZIE-2509 SLA job status can stuck in
running state
Posted by pu...@apache.org.
OOZIE-2509 SLA job status can stuck in running state
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ba7a7b85
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ba7a7b85
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ba7a7b85
Branch: refs/heads/master
Commit: ba7a7b85e040a313fa107474768dd67a325f91d5
Parents: 5fbd3eb
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue May 17 15:08:35 2016 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue May 17 15:08:35 2016 -0700
----------------------------------------------------------------------
.../command/coord/CoordActionCheckXCommand.java | 6 +-
.../sla/SLACoordActionJobEventXCommand.java | 80 ++
.../sla/SLACoordActionJobHistoryXCommand.java | 78 ++
.../oozie/command/sla/SLAJobEventXCommand.java | 301 ++++++
.../command/sla/SLAJobHistoryXCommand.java | 127 +++
.../sla/SLAWorkflowActionJobEventXCommand.java | 62 ++
.../SLAWorkflowActionJobHistoryXCommand.java | 57 ++
.../sla/SLAWorkflowJobEventXCommand.java | 64 ++
.../sla/SLAWorkflowJobHistoryXCommand.java | 56 ++
.../jpa/CoordActionGetForSLAJPAExecutor.java | 82 --
.../executor/jpa/CoordActionQueryExecutor.java | 13 +-
.../executor/jpa/SLASummaryQueryExecutor.java | 9 -
.../jpa/WorkflowActionGetForSLAJPAExecutor.java | 78 --
.../jpa/WorkflowActionQueryExecutor.java | 13 +-
.../jpa/WorkflowJobGetForSLAJPAExecutor.java | 78 --
.../executor/jpa/WorkflowJobQueryExecutor.java | 13 +-
.../oozie/service/ConfigurationService.java | 12 +-
.../org/apache/oozie/sla/SLACalcStatus.java | 54 -
.../apache/oozie/sla/SLACalculatorMemory.java | 989 +++----------------
.../org/apache/oozie/sla/SLASummaryBean.java | 2 -
.../apache/oozie/sla/SLAXCommandFactory.java | 92 ++
.../coord/TestCoordActionsKillXCommand.java | 11 +-
.../jpa/TestSLASummaryQueryExecutor.java | 2 +-
.../apache/oozie/service/TestHASLAService.java | 64 +-
.../oozie/sla/TestSLACalculatorMemory.java | 344 +++++--
.../oozie/sla/TestSLAEventGeneration.java | 1 +
.../oozie/sla/TestSLAJobEventListener.java | 136 ++-
.../org/apache/oozie/sla/TestSLAService.java | 123 ++-
release-log.txt | 1 +
29 files changed, 1668 insertions(+), 1280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
index 128feb2..bdbbd24 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
@@ -47,9 +47,10 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
/**
* The command checks workflow status for coordinator action.
@@ -177,7 +178,8 @@ public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> {
coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
coordAction.getJobId()));
- workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
+ workflowJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+ coordAction.getExternalId());
LogUtils.setLogInfo(coordAction);
}
else {
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
new file mode 100644
index 0000000..dfe2637
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobEventXCommand extends SLAJobEventXCommand {
+ CoordinatorActionBean ca;
+ WorkflowJobBean wf;
+
+ public SLACoordActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super(slaCalc, lockTimeOut);
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ ca = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, slaCalc.getId());
+ if (ca.getExternalId() != null) {
+ wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, ca.getExternalId());
+ }
+ LogUtils.setLogInfo(ca);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ }
+
+
+ protected void updateJobInfo() {
+ if (ca.isTerminalStatus()) {
+ setEnded(true);
+ setEndMiss(ca.isTerminalWithFailure());
+ slaCalc.setActualEnd(ca.getLastModifiedTime());
+ if (wf != null) {
+ if (wf.getEndTime() != null) {
+ if (slaCalc.getExpectedEnd() != null
+ && wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ setEndMiss(true);
+ }
+ slaCalc.setActualEnd(wf.getEndTime());
+ }
+ slaCalc.setActualStart(wf.getStartTime());
+ }
+ }
+ else {
+ if (wf != null) {
+ slaCalc.setActualStart(wf.getStartTime());
+ }
+ }
+ slaCalc.setJobStatus(ca.getStatusStr());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
new file mode 100644
index 0000000..b7f09d3
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+ CoordinatorActionBean cAction = null;
+
+ public SLACoordActionJobHistoryXCommand(String jobId) {
+ super(jobId);
+ }
+
+
+ protected void loadState() throws CommandException {
+ try {
+ cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(cAction);
+ }
+
+ protected void updateSLASummary() throws CommandException {
+ try {
+ updateSLASummaryForCoordAction(cAction);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+
+ }
+
+ protected void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
+ String wrkflowId = bean.getExternalId();
+ if (wrkflowId != null) {
+ WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
+ WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
+ if (wrkflow != null) {
+ updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime(), bean.getStatusStr());
+ }
+ }
+ else{
+ updateSLASummary(bean.getId(), null, bean.getLastModifiedTime(), bean.getStatusStr());
+ }
+ }
+
+ @Override
+ protected boolean isJobEnded() {
+ return cAction.isTerminalStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
new file mode 100644
index 0000000..9b18606
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.event.SLAEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobEventXCommand extends XCommand<Void> {
+ private long lockTimeOut = 0 ;
+ JPAService jpaService = Services.get().get(JPAService.class);
+ SLACalcStatus slaCalc;
+ final static String SLA_LOCK_PREFIX = "sla_";
+ private boolean isEnded = false;
+ private boolean isEndMiss = false;
+
+ public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super("SLA.job.event", "SLA.job.event", 1);
+ this.slaCalc = slaCalc;
+ this.lockTimeOut = lockTimeOut;
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ protected boolean isReQueueRequired() {
+ return false;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return SLA_LOCK_PREFIX + slaCalc.getId();
+ }
+
+ protected long getLockTimeOut() {
+ return lockTimeOut;
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+
+ @Override
+ protected Void execute() throws CommandException {
+ updateJobInfo();
+ if (isEnded) {
+ processForEnd();
+ }
+ else {
+ processForRunning();
+ }
+ try {
+ writeToDB();
+ }
+ catch (XException e) {
+ throw new CommandException(e);
+ }
+ return null;
+ }
+
+ /**
+ * Verify job.
+ */
+ protected abstract void updateJobInfo();
+
+ /**
+ * Should alert.
+ *
+ * @param slaObj the sla obj
+ * @return true, if successful
+ */
+ private boolean shouldAlert(SLACalcStatus slaObj) {
+ return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
+ }
+
+ /**
+ * Queue event.
+ *
+ * @param event the event
+ */
+ private void queueEvent(SLACalcStatus event) {
+ Services.get().get(EventHandlerService.class).queueEvent(event);
+ }
+
+ /**
+ * Process duration sla.
+ *
+ * @param expected the expected
+ * @param actual the actual
+ * @param slaCalc the sla calc
+ */
+ private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
+ if (expected != -1) {
+ if (actual > expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ }
+ else if (actual <= expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ }
+ }
+
+
+ /**
+ * WriteSLA object to DB.
+ *
+ * @throws JPAExecutorException the JPA executor exception
+ */
+ private void writeToDB() throws JPAExecutorException {
+ byte eventProc = slaCalc.getEventProcessed();
+ // no more processing, no transfer to history set
+ if (slaCalc.getEventProcessed() >= 8) {
+ slaCalc.setEventProcessed(8);
+ }
+
+ SLASummaryBean slaSummaryBean = new SLASummaryBean();
+ slaSummaryBean.setId(slaCalc.getId());
+ slaSummaryBean.setEventProcessed(eventProc);
+ slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+ slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+ slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+ slaSummaryBean.setActualStart(slaCalc.getActualStart());
+ slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+ slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+ slaSummaryBean.setLastModifiedTime(new Date());
+
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+ slaSummaryBean);
+
+ LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(),
+ slaCalc.getEventProcessed(), slaCalc.getJobStatus());
+
+ }
+
+ /**
+ * Process for end.
+ */
+ private void processForEnd() {
+ byte eventProc = slaCalc.getEventProcessed();
+
+ LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd());
+ if (isEndMiss()) {
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ }
+ else {
+ slaCalc.setSLAStatus(SLAStatus.MET);
+ }
+ if (eventProc != 8 && slaCalc.getActualStart() != null) {
+ if ((eventProc & 1) == 0) {
+ if (slaCalc.getExpectedStart() != null) {
+ if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.START_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ }
+ }
+ slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
+ if (((eventProc >> 1) & 1) == 0) {
+ processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
+ }
+ }
+ if (eventProc != 8 && eventProc < 4) {
+ if (isEndMiss()) {
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.END_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ }
+ slaCalc.setEventProcessed(8);
+ }
+
+ /**
+ * Process for running.
+ */
+ private void processForRunning() {
+ byte eventProc = slaCalc.getEventProcessed();
+
+ if (eventProc != 8 && slaCalc.getActualStart() != null) {
+ slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+ }
+ if (eventProc != 8 && (eventProc & 1) == 0) {
+ if (slaCalc.getExpectedStart() == null) {
+ eventProc++;
+ }
+ else if (slaCalc.getActualStart() != null) {
+ if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.START_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc++;
+ }
+ else if (slaCalc.getExpectedStart() != null
+ && slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc++;
+ }
+
+ }
+ if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+ if (slaCalc.getExpectedDuration() == -1) {
+ eventProc += 2;
+ }
+ else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) {
+ if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc += 2;
+ }
+ }
+ }
+ if (eventProc < 4) {
+ if (slaCalc.getExpectedEnd() != null) {
+ if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc += 4;
+ }
+ }
+ else {
+ eventProc += 4;
+ }
+ }
+ slaCalc.setEventProcessed(eventProc);
+ }
+
+ public boolean isEnded() {
+ return isEnded;
+ }
+
+ public void setEnded(boolean isEnded) {
+ this.isEnded = isEnded;
+ }
+
+ public boolean isEndMiss() {
+ return isEndMiss;
+ }
+
+ public void setEndMiss(boolean isEndMiss) {
+ this.isEndMiss = isEndMiss;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
new file mode 100644
index 0000000..0b4045a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobHistoryXCommand extends XCommand<Boolean> {
+
+ protected String jobId;
+
+ public SLAJobHistoryXCommand(String jobId) {
+ super("SLAJobHistoryXCommand", "SLAJobHistoryXCommand", 1);
+ this.jobId = jobId;
+
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ protected boolean isReQueueRequired() {
+ return false;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return SLAJobEventXCommand.SLA_LOCK_PREFIX + jobId;
+ }
+
+ protected long getLockTimeOut() {
+ return 0L;
+ }
+
+ protected Boolean execute() throws CommandException {
+ if (isJobEnded()) {
+ try {
+ updateSLASummary();
+ }
+ catch (XException e) {
+ throw new CommandException(e);
+ }
+ return true;
+ }
+ else {
+ LOG.debug("Job [{0}] is not finished", jobId);
+ }
+ return false;
+
+ }
+
+ /**
+ * Checks if is job ended.
+ *
+ * @return true, if is job ended
+ */
+ protected abstract boolean isJobEnded();
+
+ /**
+ * Update SLASummary
+ *
+ */
+ protected abstract void updateSLASummary() throws CommandException, XException;
+
+ /**
+ * Update sla summary.
+ *
+ * @param id the id
+ * @param startTime the start time
+ * @param endTime the end time
+ * @param status the status
+ * @throws JPAExecutorException the JPA executor exception
+ */
+ protected void updateSLASummary(String id, Date startTime, Date endTime, String status) throws JPAExecutorException {
+ SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
+ if (sla.getJobStatus().equals(status) && sla.getEventProcessed() == 8) {
+ LOG.debug("SLA job is already updated", sla.getId(), sla.getEventProcessed(), sla.getJobStatus());
+ return;
+ }
+ if (sla != null) {
+ sla.setActualStart(startTime);
+ sla.setActualEnd(endTime);
+ if (startTime != null && endTime != null) {
+ sla.setActualDuration(endTime.getTime() - startTime.getTime());
+ }
+ sla.setLastModifiedTime(new Date());
+ sla.setEventProcessed(8);
+ sla.setJobStatus(status);
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+ sla);
+ LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", sla.getId(),
+ sla.getEventProcessed(), sla.getJobStatus());
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
new file mode 100644
index 0000000..fef77ae
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobEventXCommand extends SLAJobEventXCommand {
+ WorkflowActionBean wa;
+
+ public SLAWorkflowActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super(slaCalc, lockTimeOut);
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ wa = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_FOR_SLA, slaCalc.getId());
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wa);
+
+ }
+
+
+ @Override
+ protected void updateJobInfo() {
+ if (wa.getEndTime() != null) {
+ setEnded(true);
+ if (wa.isTerminalWithFailure() || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ setEndMiss(true);
+ }
+ }
+ slaCalc.setActualStart(wa.getStartTime());
+ slaCalc.setActualEnd(wa.getEndTime());
+ slaCalc.setJobStatus(wa.getStatusStr());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
new file mode 100644
index 0000000..7dc4a3c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+ WorkflowActionBean wfAction = null;
+
+ public SLAWorkflowActionJobHistoryXCommand(String jobId) {
+ super(jobId);
+ }
+
+ protected void loadState() throws CommandException {
+
+ try {
+ wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wfAction);
+ }
+
+ protected void updateSLASummary() throws XException {
+ updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
+
+ }
+
+ @Override
+ protected boolean isJobEnded() {
+ return wfAction.isComplete() || wfAction.isTerminalWithFailure();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
new file mode 100644
index 0000000..9a72617
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobEventXCommand extends SLAJobEventXCommand {
+ WorkflowJobBean wf;
+
+ public SLAWorkflowJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super(slaCalc, lockTimeOut);
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, slaCalc.getId());
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wf);
+
+ }
+
+
+ @Override
+ protected void updateJobInfo() {
+ if (wf.inTerminalState()) {
+ setEnded(true);
+ if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
+ || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ setEndMiss(true);
+ }
+ slaCalc.setActualEnd(wf.getEndTime());
+ }
+ slaCalc.setActualStart(wf.getStartTime());
+ slaCalc.setJobStatus(wf.getStatusStr());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
new file mode 100644
index 0000000..79e45ee
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+ WorkflowJobBean wfJob = null;
+
+ public SLAWorkflowJobHistoryXCommand(String jobId) {
+ super(jobId);
+ }
+
+
+ protected void loadState() throws CommandException {
+ try {
+ wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wfJob);
+ }
+
+ protected void updateSLASummary() throws XException {
+ updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
+ }
+
+ @Override
+ protected boolean isJobEnded() {
+ return wfJob.inTerminalState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 8a5b997..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * JPAExecutor to get attributes of CoordinatorActionBean required by SLAService on restart
- */
-public class CoordActionGetForSLAJPAExecutor implements JPAExecutor<CoordinatorActionBean> {
-
- private String coordActionId;
-
- public CoordActionGetForSLAJPAExecutor(String coordActionId) {
- ParamChecker.notNull(coordActionId, "coordActionId");
- this.coordActionId = coordActionId;
- }
-
- @Override
- public String getName() {
- return "CoordActionGetForSLAJPAExecutor";
- }
-
- @Override
- public CoordinatorActionBean execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_COORD_ACTION_FOR_SLA");
- q.setParameter("id", coordActionId);
- Object[] obj = (Object[]) q.getSingleResult();
- CoordinatorActionBean caBean = getBeanForRunningCoordAction(obj);
- return caBean;
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
-
- }
-
- private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
- CoordinatorActionBean bean = new CoordinatorActionBean();
- if (arr[0] != null) {
- bean.setId((String) arr[0]);
- }
- if (arr[1] != null) {
- bean.setJobId((String) arr[1]);
- }
- if (arr[2] != null) {
- bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
- }
- if (arr[3] != null) {
- bean.setExternalId((String) arr[3]);
- }
- if (arr[4] != null) {
- bean.setLastModifiedTime(DateUtils.toDate((Timestamp)arr[4]));
- }
- return bean;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index 79ec28c..c0e6c19 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -59,7 +59,8 @@ public class CoordActionQueryExecutor extends
GET_TERMINATED_ACTION_IDS_FOR_DATES,
GET_ACTIVE_ACTIONS_FOR_DATES,
GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
- GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN
+ GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN,
+ GET_COORD_ACTION_FOR_SLA
};
private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -177,6 +178,7 @@ public class CoordActionQueryExecutor extends
switch (caQuery) {
case GET_COORD_ACTION:
case GET_COORD_ACTION_STATUS:
+ case GET_COORD_ACTION_FOR_SLA:
query.setParameter("id", parameters[0]);
break;
case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
@@ -330,6 +332,15 @@ public class CoordActionQueryExecutor extends
bean.setExternalId((String) arr[3]);
bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
break;
+ case GET_COORD_ACTION_FOR_SLA:
+ arr = (Object[]) ret;
+ bean = new CoordinatorActionBean();
+ bean.setId((String) arr[0]);
+ bean.setJobId((String) arr[1]);
+ bean.setStatusStr((String) arr[2]);
+ bean.setExternalId((String) arr[3]);
+ bean.setLastModifiedTime((Timestamp) arr[4]);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index 6663162..6ff9df8 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -37,7 +37,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
public enum SLASummaryQuery {
UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
- UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
UPDATE_SLA_SUMMARY_ALL,
UPDATE_SLA_SUMMARY_EVENTPROCESSED,
UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
@@ -72,14 +71,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
query.setParameter("actualEndTS", bean.getActualEndTimestamp());
query.setParameter("actualDuration", bean.getActualDuration());
break;
- case UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES:
- query.setParameter("jobId", bean.getId());
- query.setParameter("eventProcessed", bean.getEventProcessed());
- query.setParameter("actualStartTS", bean.getActualStartTimestamp());
- query.setParameter("actualEndTS", bean.getActualEndTimestamp());
- query.setParameter("actualDuration", bean.getActualDuration());
- query.setParameter("lastModifiedTS", bean.getLastModifiedTimestamp());
- break;
case UPDATE_SLA_SUMMARY_ALL:
query.setParameter("appName", bean.getAppName());
query.setParameter("appType", bean.getAppType().toString());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 280294b..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow action bean for sla service
- */
-public class WorkflowActionGetForSLAJPAExecutor implements JPAExecutor<WorkflowActionBean> {
-
- private String wfActionId;
-
- public WorkflowActionGetForSLAJPAExecutor(String wfActionId) {
- ParamChecker.notNull(wfActionId, "wfActionId");
- this.wfActionId = wfActionId;
- }
-
- @Override
- public String getName() {
- return "WorkflowActionGetForSLAJPAExecutor";
- }
-
- @Override
- public WorkflowActionBean execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_ACTION_FOR_SLA");
- q.setParameter("id", wfActionId);
- Object[] obj = (Object[]) q.getSingleResult();
- return getBeanFromArray(obj);
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
- private WorkflowActionBean getBeanFromArray(Object[] arr) {
- WorkflowActionBean wab = new WorkflowActionBean();
- if (arr[0] != null) {
- wab.setId((String) arr[0]);
- }
- if (arr[1] != null) {
- wab.setStatus(WorkflowAction.Status.valueOf((String) arr[1]));
- }
- if (arr[2] != null) {
- wab.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
- }
- if (arr[3] != null) {
- wab.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
- }
- return wab;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 078fd40..f01f090 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -57,7 +57,8 @@ public class WorkflowActionQueryExecutor extends
GET_ACTION_COMPLETED,
GET_RUNNING_ACTIONS,
GET_PENDING_ACTIONS,
- GET_ACTIONS_FOR_WORKFLOW_RERUN
+ GET_ACTIONS_FOR_WORKFLOW_RERUN,
+ GET_ACTION_FOR_SLA
};
private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor();
@@ -202,6 +203,7 @@ public class WorkflowActionQueryExecutor extends
case GET_ACTION_CHECK:
case GET_ACTION_END:
case GET_ACTION_COMPLETED:
+ case GET_ACTION_FOR_SLA:
query.setParameter("id", parameters[0]);
break;
case GET_RUNNING_ACTIONS:
@@ -363,6 +365,15 @@ public class WorkflowActionQueryExecutor extends
bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
bean.setType((String) arr[4]);
break;
+ case GET_ACTION_FOR_SLA:
+ bean = new WorkflowActionBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setStatusStr((String) arr[1]);
+ bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+ bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
deleted file mode 100644
index 774766f..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow job bean for sla service
- */
-public class WorkflowJobGetForSLAJPAExecutor implements JPAExecutor<WorkflowJobBean> {
-
- private String wfJobId;
-
- public WorkflowJobGetForSLAJPAExecutor(String wfJobId) {
- ParamChecker.notNull(wfJobId, "wfJobId");
- this.wfJobId = wfJobId;
- }
-
- @Override
- public String getName() {
- return "WorkflowJobGetForSLAJPAExecutor";
- }
-
- @Override
- public WorkflowJobBean execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_WORKFLOW_FOR_SLA");
- q.setParameter("id", wfJobId);
- Object[] obj = (Object[]) q.getSingleResult();
- return getBeanFromArray(obj);
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
- private WorkflowJobBean getBeanFromArray(Object[] arr) {
- WorkflowJobBean wjb = new WorkflowJobBean();
- if (arr[0] != null) {
- wjb.setId((String) arr[0]);
- }
- if (arr[1] != null) {
- wjb.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
- }
- if (arr[2] != null) {
- wjb.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
- }
- if (arr[3] != null) {
- wjb.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
- }
- return wjb;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index ce108d5..13fa54d 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -28,7 +28,6 @@ import javax.persistence.Query;
import org.apache.oozie.BinaryBlob;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.StringBlob;
-import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -60,7 +59,8 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
GET_WORKFLOW_RESUME,
GET_WORKFLOW_STATUS,
GET_WORKFLOWS_PARENT_COORD_RERUN,
- GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN
+ GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN,
+ GET_WORKFLOW_FOR_SLA
};
private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor();
@@ -171,6 +171,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
case GET_WORKFLOW_KILL:
case GET_WORKFLOW_RESUME:
case GET_WORKFLOW_STATUS:
+ case GET_WORKFLOW_FOR_SLA:
query.setParameter("id", parameters[0]);
break;
case GET_WORKFLOWS_PARENT_COORD_RERUN:
@@ -330,6 +331,14 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
bean.setId((String) arr[0]);
bean.setParentId((String) arr[1]);
break;
+ case GET_WORKFLOW_FOR_SLA:
+ bean = new WorkflowJobBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setStatusStr((String) arr[1]);
+ bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+ bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
index 4246764..9d4dcd9 100644
--- a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
+++ b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
@@ -544,12 +544,19 @@ public class ConfigurationService implements Service, Instrumentable {
}
public static long getLong(String name) {
+ return getLong(name, ConfigUtils.LONG_DEFAULT);
+ }
+
+ public static long getLong(String name, long defultValue) {
Configuration conf = Services.get().getConf();
- return getLong(conf, name);
+ return getLong(conf, name, defultValue);
}
public static long getLong(Configuration conf, String name) {
- return conf.getLong(name, ConfigUtils.LONG_DEFAULT);
+ return getLong(conf, name, ConfigUtils.LONG_DEFAULT);
+ }
+ public static long getLong(Configuration conf, String name, long defultValue) {
+ return conf.getLong(name, defultValue);
}
public static Class<?>[] getClasses(String name) {
@@ -590,4 +597,5 @@ public class ConfigurationService implements Service, Instrumentable {
Configuration conf = Services.get().getConf();
return getPassword(conf, name, defaultValue);
}
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 5c0cfd9..3a76dfe 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -25,11 +25,6 @@ import java.util.Map;
import org.apache.oozie.AppType;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.event.SLAEvent;
-import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.service.JobsConcurrencyService;
-import org.apache.oozie.service.MemoryLocksService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
@@ -49,7 +44,6 @@ public class SLACalcStatus extends SLAEvent {
private long actualDuration = -1;
private Date lastModifiedTime;
private byte eventProcessed;
- private LockToken lock;
private XLog LOG;
@@ -293,53 +287,5 @@ public class SLACalcStatus extends SLAEvent {
public String getEntityKey() {
return SLA_ENTITYKEY_PREFIX + this.getId();
}
- /**
- * Obtain an exclusive lock on the {link #getEntityKey}.
- * <p>
- * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
- *
- * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
- */
- public void acquireLock() throws InterruptedException {
- // only get ZK lock when multiple servers running
- if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
- lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
- if (lock == null) {
- LOG.debug("Could not aquire lock for [{0}]", getEntityKey());
- }
- else {
- LOG.debug("Acquired lock for [{0}]", getEntityKey());
- }
- }
- else {
- lock = new DummyToken();
- }
- }
-
- private static class DummyToken implements LockToken {
- @Override
- public void release() {
- }
- }
-
- public boolean isLocked() {
- boolean locked = false;
- if(lock != null) {
- locked = true;
- }
- return locked;
- }
-
- public void releaseLock(){
- if (lock != null) {
- lock.release();
- lock = null;
- LOG.debug("Released lock for [{0}]", getEntityKey());
- }
- }
-
- public long getLockTimeOut() {
- return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
- }
}
[2/3] oozie git commit: OOZIE-2509 SLA job status can stuck in
running state
Posted by pu...@apache.org.
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 42313fd..e8638a9 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -31,47 +31,28 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.JobEvent;
-import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
-import org.apache.oozie.lock.LockToken;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.JobsConcurrencyService;
-import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
@@ -80,8 +61,8 @@ import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.Pair;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
/**
* Implementation class for SLACalculator that calculates SLA related to
@@ -111,14 +92,14 @@ public class SLACalculatorMemory implements SLACalculator {
modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
loadOnRestart();
Runnable purgeThread = new HistoryPurgeWorker();
- // schedule runnable by default 1 day
+ // schedule runnable by default 1 hours
Services.get()
.get(SchedulerService.class)
- .schedule(purgeThread, 86400, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 86400),
+ .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600),
SchedulerService.Unit.SEC);
}
- public class HistoryPurgeWorker implements Runnable {
+ public class HistoryPurgeWorker extends Thread {
public HistoryPurgeWorker() {
}
@@ -131,327 +112,88 @@ public class SLACalculatorMemory implements SLACalculator {
Iterator<String> jobItr = historySet.iterator();
while (jobItr.hasNext()) {
String jobId = jobItr.next();
-
- if (jobId.endsWith("-W")) {
- WorkflowJobBean wfJob = null;
- try {
- wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
- }
- catch (JPAExecutorException e) {
- if (e.getErrorCode().equals(ErrorCode.E0604)) {
- jobItr.remove();
- }
- else {
- LOG.info("Failed to fetch the workflow job: " + jobId, e);
- }
- }
- if (wfJob != null && wfJob.inTerminalState()) {
- try {
- updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime());
- jobItr.remove();
- }
- catch (JPAExecutorException e) {
- LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
- }
-
- }
- }
- else if (jobId.contains("-W@")) {
- WorkflowActionBean wfAction = null;
- try {
- wfAction = WorkflowActionQueryExecutor.getInstance().get(
- WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
- }
- catch (JPAExecutorException e) {
- if (e.getErrorCode().equals(ErrorCode.E0605)) {
- jobItr.remove();
- }
- else {
- LOG.info("Failed to fetch the workflow action: " + jobId, e);
- }
- }
- if (wfAction != null && (wfAction.isComplete() || wfAction.isTerminalWithFailure())) {
- try {
- updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime());
- jobItr.remove();
- }
- catch (JPAExecutorException e) {
- LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
- }
- }
- }
- else if (jobId.contains("-C@")) {
- CoordinatorActionBean cAction = null;
- try {
- cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId);
- }
- catch (JPAExecutorException e) {
- if (e.getErrorCode().equals(ErrorCode.E0605)) {
- jobItr.remove();
- }
- else {
- LOG.info("Failed to fetch the coord action: " + jobId, e);
- }
- }
- if (cAction != null && cAction.isTerminalStatus()) {
- try {
- updateSLASummaryForCoordAction(cAction);
- jobItr.remove();
- }
- catch (JPAExecutorException e) {
- XLog.getLog(SLACalculatorMemory.class).info(
- "Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
- }
-
+ LOG.debug(" Running HistoryPurgeWorker for " + jobId);
+ try {
+ boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call();
+ if (isDone) {
+ LOG.debug("[{0}] job is finished and processed. Removing from history");
+ jobItr.remove();
}
}
- else if (jobId.endsWith("-C")) {
- CoordinatorJobBean cJob = null;
- try {
- cJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID,
- jobId);
- }
- catch (JPAExecutorException e) {
- if (e.getErrorCode().equals(ErrorCode.E0604)) {
- jobItr.remove();
- }
- else {
- LOG.info("Failed to fetch the coord job: " + jobId, e);
- }
+ catch (CommandException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
+ LOG.warn("Job is not found in db: " + jobId, e);
+ jobItr.remove();
}
- if (cJob != null && cJob.isTerminalStatus()) {
- try {
- updateSLASummary(cJob.getId(), cJob.getStartTime(), cJob.getEndTime());
- jobItr.remove();
- }
- catch (JPAExecutorException e) {
- LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
- }
-
+ else {
+ LOG.error("Failed to fetch the job: " + jobId, e);
}
}
}
}
-
- private void updateSLASummary(String id, Date startTime, Date endTime) throws JPAExecutorException {
- SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
- if (sla != null) {
- sla.setActualStart(startTime);
- sla.setActualEnd(endTime);
- if (startTime != null && endTime != null) {
- sla.setActualDuration(endTime.getTime() - startTime.getTime());
- }
- sla.setLastModifiedTime(new Date());
- sla.setEventProcessed(8);
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, sla);
- }
- }
-
- private void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
- String wrkflowId = bean.getExternalId();
- if (wrkflowId != null) {
- WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
- WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
- if (wrkflow != null) {
- updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime());
- }
- }
- }
}
private void loadOnRestart() {
- boolean isJobModified = false;
+ long slaPendingCount = 0;
+ long statusPendingCount = 0;
+
try {
- long slaPendingCount = 0;
- long statusPendingCount = 0;
List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
modifiedAfter));
for (SLASummaryBean summaryBean : summaryBeans) {
String jobId = summaryBean.getId();
- LockToken lock = null;
- switch (summaryBean.getAppType()) {
- case COORDINATOR_ACTION:
- isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
- break;
- case WORKFLOW_ACTION:
- isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
- break;
- case WORKFLOW_JOB:
- isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
- break;
- default:
- break;
- }
- if (isJobModified) {
- try {
- boolean update = true;
- if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
- lock = Services
- .get()
- .get(MemoryLocksService.class)
- .getWriteLock(
- SLACalcStatus.SLA_ENTITYKEY_PREFIX + jobId,
- Services.get().getConf()
- .getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000));
- if (lock == null) {
- update = false;
- }
- }
- if (update) {
- summaryBean.setLastModifiedTime(new Date());
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
- }
- }
- catch (Exception e) {
- LOG.warn("Failed to load records for " + jobId, e);
- }
- finally {
- if (lock != null) {
- lock.release();
- lock = null;
- }
- }
- }
+
+ SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
+ SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
+
+ // Processed missed jobs
try {
- if (summaryBean.getEventProcessed() == 7) {
- historySet.add(jobId);
- statusPendingCount++;
- }
- else if (summaryBean.getEventProcessed() <= 7) {
- SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
- SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
- SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
- slaMap.put(jobId, slaCalcStatus);
- slaPendingCount++;
- }
+ SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call();
}
- catch (Exception e) {
- LOG.warn("Failed to fetch/update records for " + jobId, e);
+ catch (Throwable e) {
+ LOG.error("Error while updating job {0}", slaCalcStatus.getId(), e);
}
+ if (slaCalcStatus.getEventProcessed() == 7) {
+ historySet.add(jobId);
+ statusPendingCount++;
+ LOG.debug("Adding job [{0}] to historySet. EventProcessed is [{1}]", slaCalcStatus,
+ slaCalcStatus);
+ }
+ else if (slaCalcStatus.getEventProcessed() < 7) {
+ slaMap.put(jobId, slaCalcStatus);
+ slaPendingCount++;
+ LOG.debug("Adding job [{0}] to slamap. EventProcessed is [{1}]", slaCalcStatus,
+ slaCalcStatus);
+
+ }
}
LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
-
}
catch (Exception e) {
LOG.warn("Failed to retrieve SLASummary records on restart", e);
}
}
- private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
- throws JPAExecutorException {
- boolean isJobModified = false;
- CoordinatorActionBean coordAction = null;
- coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
- if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
- LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
- + summaryBean.getJobStatus());
- isJobModified = true;
- summaryBean.setJobStatus(coordAction.getStatusStr());
- if (coordAction.isTerminalStatus()) {
- WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
- .getExternalId()));
- setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
- coordAction.getStatusStr());
- }
- else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
- WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
- .getExternalId()));
- setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
- }
- }
- return isJobModified;
- }
-
- private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
- throws JPAExecutorException {
- boolean isJobModified = false;
- WorkflowActionBean wfAction = null;
- wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
- if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
- LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
- + summaryBean.getJobStatus());
- isJobModified = true;
- summaryBean.setJobStatus(wfAction.getStatusStr());
- if (wfAction.inTerminalState()) {
- setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
- }
- else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
- setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
- }
- }
- return isJobModified;
- }
-
- private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
- throws JPAExecutorException {
- boolean isJobModified = false;
- WorkflowJobBean wfJob = null;
- wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
- if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
- LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
- + summaryBean.getJobStatus());
- isJobModified = true;
- summaryBean.setJobStatus(wfJob.getStatusStr());
- if (wfJob.inTerminalState()) {
- setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
- }
- else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
- setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
- }
- }
- return isJobModified;
- }
-
- private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
- byte eventProc = summaryBean.getEventProcessed();
- summaryBean.setEventProcessed(8);
- summaryBean.setActualStart(startTime);
- summaryBean.setActualEnd(endTime);
- long actualDuration = endTime.getTime() - startTime.getTime();
- summaryBean.setActualDuration(actualDuration);
- if (eventProc < 4) {
- if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
- || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
- if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
- summaryBean.setSLAStatus(SLAStatus.MET);
- }
- else {
- summaryBean.setSLAStatus(SLAStatus.MISS);
- }
- }
- else {
- summaryBean.setSLAStatus(SLAStatus.MISS);
- }
- }
-
- }
-
- private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
- if (((eventProc & 1) == 0)) {
- eventProc += 1;
- summaryBean.setEventProcessed(eventProc);
- }
- if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
- summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
- }
- summaryBean.setActualStart(startTime);
- }
-
@Override
public int size() {
return slaMap.size();
}
+ @VisibleForTesting
+ public Set<String> getHistorySet(){
+ return historySet;
+ }
+
@Override
public SLACalcStatus get(String jobId) throws JPAExecutorException {
SLACalcStatus memObj;
memObj = slaMap.get(jobId);
if (memObj == null && historySet.contains(jobId)) {
- memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
- SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
+ memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
+ .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
}
return memObj;
}
@@ -460,13 +202,13 @@ public class SLACalculatorMemory implements SLACalculator {
SLACalcStatus memObj;
memObj = slaMap.get(jobId);
if (memObj == null) {
- memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
- SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
+ memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
+ .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
}
return memObj;
}
-
@Override
public Iterator<String> iterator() {
return slaMap.keySet().iterator();
@@ -488,12 +230,17 @@ public class SLACalculatorMemory implements SLACalculator {
*/
protected void updateJobSla(String jobId) throws Exception {
SLACalcStatus slaCalc = slaMap.get(jobId);
+
+ if (slaCalc == null) {
+ // job might be processed and removed from map by addJobStatus
+ return;
+ }
synchronized (slaCalc) {
- boolean change = false;
// get eventProcessed on DB for validation in HA
SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
byte eventProc = summaryBean.getEventProcessed();
+ slaCalc.setEventProcessed(eventProc);
if (eventProc >= 7) {
if (eventProc == 7) {
historySet.add(jobId);
@@ -503,127 +250,69 @@ public class SLACalculatorMemory implements SLACalculator {
}
else {
if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
- //Update last modified time.
+ // Update last modified time.
slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
reloadExpectedTimeAndConfig(slaCalc);
LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
}
- slaCalc.setEventProcessed(eventProc);
- SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
- // calculation w.r.t current time and status
- if ((eventProc & 1) == 0) { // first bit (start-processed) unset
- if (reg.getExpectedStart() != null) {
- if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
- confirmWithDB(slaCalc);
- eventProc = slaCalc.getEventProcessed();
- if (eventProc != 8 && (eventProc & 1) == 0) {
- // Some DB exception
- slaCalc.setEventStatus(EventStatus.START_MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- eventProc++;
- }
- change = true;
- }
- }
- else {
- eventProc++; // disable further processing for optional start sla condition
- change = true;
- }
- }
- // check if second bit (duration-processed) is unset
- if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
- if (reg.getExpectedDuration() == -1) {
- eventProc += 2;
- change = true;
- }
- else if (slaCalc.getActualStart() != null) {
- if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
- .getActualStart().getTime())) {
- slaCalc.setEventProcessed(eventProc);
- confirmWithDB(slaCalc);
- eventProc = slaCalc.getEventProcessed();
- if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
- // Some DB exception
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- eventProc += 2;
- }
- change = true;
- }
- }
- }
- if (eventProc < 4) {
- if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
- slaCalc.setEventProcessed(eventProc);
- confirmWithDB(slaCalc);
- eventProc = slaCalc.getEventProcessed();
- change = true;
- }
- }
- if (change) {
+ if (isChanged(slaCalc)) {
+ LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(),
+ slaCalc.getEventProcessed(), slaCalc.getJobStatus());
try {
- boolean locked = true;
- slaCalc.acquireLock();
- locked = slaCalc.isLocked();
- if (locked) {
- // no more processing, no transfer to history set
- if (slaCalc.getEventProcessed() >= 8) {
- eventProc = 8;
- // Should not be > 8. But to handle any corner cases
- slaCalc.setEventProcessed(8);
- slaMap.remove(jobId);
- LOG.trace("Removed Job [{0}] from map after Event-processed=8", jobId);
- }
- else {
- slaCalc.setEventProcessed(eventProc);
- }
- writetoDB(slaCalc, eventProc);
- if (eventProc == 7) {
- historySet.add(jobId);
- slaMap.remove(jobId);
- LOG.trace("Removed Job [{0}] from map after Event-processed=7", jobId);
- }
- }
- }
- catch (InterruptedException e) {
- throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
+ SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
+ checkEventProc(slaCalc);
}
- finally {
- slaCalc.releaseLock();
+ catch (XException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
+ LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId());
+ slaMap.remove(jobId);
+ }
}
}
+
}
}
}
- private void writetoDB(SLACalcStatus slaCalc, byte eventProc) throws JPAExecutorException {
- SLASummaryBean slaSummaryBean = new SLASummaryBean();
- slaSummaryBean.setId(slaCalc.getId());
- slaSummaryBean.setEventProcessed(eventProc);
- slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
- slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
- slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
- slaSummaryBean.setActualStart(slaCalc.getActualStart());
- slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
- slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
- slaSummaryBean.setLastModifiedTime(new Date());
+ private boolean isChanged(SLACalcStatus slaCalc) {
+ SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
+ byte eventProc = slaCalc.getEventProcessed();
- SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
- slaSummaryBean);
- LOG.trace("Stored SLA SummaryBean Job [{0}] with Event-processed=[{1}]", slaCalc.getId(),
- slaSummaryBean.getEventProcessed());
+ if ((eventProc & 1) == 0) { // first bit (start-processed) unset
+ if (reg.getExpectedStart() != null) {
+ if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
+ return true;
+ }
+ }
+ else {
+ return true;
+ }
+ }
+ if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+ if (reg.getExpectedDuration() == -1) {
+ return true;
+ }
+ else if (slaCalc.getActualStart() != null) {
+ if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
+ .getActualStart().getTime())) {
+ return true;
+ }
+ }
+ }
+ if (eventProc < 4) {
+ if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
+ return true;
+ }
+ }
+ return false;
}
@SuppressWarnings("rawtypes")
- private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList)
- throws JPAExecutorException {
+ private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException {
updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean()));
slaCalc.setLastModifiedTime(new Date());
- updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, new SLASummaryBean(slaCalc)));
+ updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
+ new SLASummaryBean(slaCalc)));
}
@SuppressWarnings("rawtypes")
@@ -641,7 +330,6 @@ public class SLACalculatorMemory implements SLACalculator {
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
}
-
/**
* Periodically run by the SLAService worker threads to update SLA status by
* iterating through all the jobs in the map
@@ -770,410 +458,62 @@ public class SLACalculatorMemory implements SLACalculator {
@Override
public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
Date endTime) throws JPAExecutorException, ServiceException {
+ LOG.debug(
+ "Received addJobStatus request for job [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], "
+ + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime);
SLACalcStatus slaCalc = slaMap.get(jobId);
- SLASummaryBean slaInfo = null;
- boolean hasSla = false;
- if (slaCalc == null) {
- if (historySet.contains(jobId)) {
- slaInfo = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
- if (slaInfo == null) {
- throw new JPAExecutorException(ErrorCode.E0604, jobId);
- }
- slaInfo.setJobStatus(jobStatus);
- slaInfo.setActualStart(startTime);
- slaInfo.setActualEnd(endTime);
- if (endTime != null) {
- slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
- }
- slaInfo.setEventProcessed(8);
- historySet.remove(jobId);
- slaInfo.setLastModifiedTime(new Date());
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
- hasSla = true;
- }
- else if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
- // jobid might not exist in slaMap in HA Setting
- SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
- SLARegQuery.GET_SLA_REG_ALL, jobId);
- if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
- // but not actually configured for SLA
- SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
- SLASummaryQuery.GET_SLA_SUMMARY, jobId);
- if (slaSummaryBean.getEventProcessed() < 7) {
- slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
- slaMap.put(jobId, slaCalc);
- }
- }
- }
- }
- if (slaCalc != null) {
- synchronized (slaCalc) {
- try {
- // only get ZK lock when multiple servers running
- boolean locked = true;
- slaCalc.acquireLock();
- locked = slaCalc.isLocked();
- if (locked) {
- // get eventProcessed on DB for validation in HA
- SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
- SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
- byte eventProc = summaryBean.getEventProcessed();
-
- if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
- //Update last modified time.
- slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
- reloadExpectedTimeAndConfig(slaCalc);
- LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
- }
- slaCalc.setEventProcessed(eventProc);
- slaCalc.setJobStatus(jobStatus);
- switch (jobEventStatus) {
- case STARTED:
- slaInfo = processJobStartSLA(slaCalc, startTime);
- break;
- case SUCCESS:
- slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
- break;
- case FAILURE:
- slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
- break;
- default:
- LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
- slaInfo = getSLASummaryBean(slaCalc);
- }
- if (slaCalc.getEventProcessed() == 7) {
- slaInfo.setEventProcessed(8);
- slaMap.remove(jobId);
- }
- slaInfo.setLastModifiedTime(new Date());
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
- hasSla = true;
- }
- }
- catch (InterruptedException e) {
- throw new ServiceException(ErrorCode.E0606, slaCalc.getEntityKey(), slaCalc.getLockTimeOut());
- }
- finally {
- slaCalc.releaseLock();
- }
+ if (slaCalc == null) {
+ SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ALL, jobId);
+ if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
+ // but not actually configured for SLA
+ SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
+ SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
+ slaMap.put(jobId, slaCalc);
}
- LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
}
+ else {
+ SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
+ byte eventProc = summaryBean.getEventProcessed();
+ if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
+ // Update last modified time.
+ slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
+ reloadExpectedTimeAndConfig(slaCalc);
+ LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
- return hasSla;
- }
-
- /**
- * Process SLA for jobs that started running. Also update actual-start time
- *
- * @param slaCalc
- * @param actualStart
- * @return SLASummaryBean
- */
- private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
- slaCalc.setActualStart(actualStart);
- if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
- slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
- }
- SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
- Date expecStart = reg.getExpectedStart();
- byte eventProc = slaCalc.getEventProcessed();
- // set event proc here
- if (((eventProc & 1) == 0)) {
- if (expecStart != null) {
- if (actualStart.getTime() > expecStart.getTime()) {
- slaCalc.setEventStatus(EventStatus.START_MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.START_MET);
- }
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
}
- eventProc += 1;
- slaCalc.setEventProcessed(eventProc);
- }
- return getSLASummaryBean(slaCalc);
- }
-
- /**
- * Process SLA for jobs that ended successfully. Also update actual-start
- * and end time
- *
- * @param slaCalc
- * @param actualStart
- * @param actualEnd
- * @return SLASummaryBean
- * @throws JPAExecutorException
- */
- private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
- SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
- slaCalc.setActualStart(actualStart);
- slaCalc.setActualEnd(actualEnd);
- long expectedDuration = reg.getExpectedDuration();
- long actualDuration = actualEnd.getTime() - actualStart.getTime();
- slaCalc.setActualDuration(actualDuration);
- //check event proc
- byte eventProc = slaCalc.getEventProcessed();
- if (((eventProc >> 1) & 1) == 0) {
- processDurationSLA(expectedDuration, actualDuration, slaCalc);
- eventProc += 2;
slaCalc.setEventProcessed(eventProc);
}
-
- if (eventProc < 4) {
- Date expectedEnd = reg.getExpectedEnd();
- if (actualEnd.getTime() > expectedEnd.getTime()) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.END_MET);
- slaCalc.setSLAStatus(SLAStatus.MET);
- }
- eventProc += 4;
- slaCalc.setEventProcessed(eventProc);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ if (slaCalc != null) {
+ try {
+ SLAXCommandFactory.getSLAEventXCommand(slaCalc,
+ ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call();
+ checkEventProc(slaCalc);
}
- }
- return getSLASummaryBean(slaCalc);
- }
-
- /**
- * Process SLA for jobs that ended in failure. Also update actual-start and
- * end time
- *
- * @param slaCalc
- * @param actualStart
- * @param actualEnd
- * @return SLASummaryBean
- * @throws JPAExecutorException
- */
- private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
- slaCalc.setActualStart(actualStart);
- slaCalc.setActualEnd(actualEnd);
- if (actualStart == null) { // job failed before starting
- if (slaCalc.getEventProcessed() < 4) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- slaCalc.setEventProcessed(7);
- return getSLASummaryBean(slaCalc);
+ catch (XException e) {
+ LOG.error(e);
+ throw new ServiceException(e);
}
+ return true;
}
- SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
- long expectedDuration = reg.getExpectedDuration();
- long actualDuration = actualEnd.getTime() - actualStart.getTime();
- slaCalc.setActualDuration(actualDuration);
-
- byte eventProc = slaCalc.getEventProcessed();
- if (((eventProc >> 1) & 1) == 0) {
- if (expectedDuration != -1) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- }
- eventProc += 2;
- slaCalc.setEventProcessed(eventProc);
- }
- if (eventProc < 4) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
- eventProc += 4;
- slaCalc.setEventProcessed(eventProc);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- }
- return getSLASummaryBean(slaCalc);
- }
-
- private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
- SLASummaryBean slaSummaryBean = new SLASummaryBean();
- slaSummaryBean.setActualStart(slaCalc.getActualStart());
- slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
- slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
- slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
- slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
- slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
- slaSummaryBean.setId(slaCalc.getId());
- slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
- return slaSummaryBean;
- }
-
- private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
- if (expected != -1) {
- if (actual > expected) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- }
- else if (actual <= expected) {
- slaCalc.setEventStatus(EventStatus.DURATION_MET);
- }
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
+ else {
+ return false;
}
}
- /*
- * Confirm alerts against source of truth - DB. Also required in case of High Availability
- */
- private void confirmWithDB(SLACalcStatus slaCalc) {
- boolean ended = false, isEndMiss = false;
- try {
- switch (slaCalc.getAppType()) {
- case WORKFLOW_JOB:
- WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
- if (wf.getEndTime() != null) {
- ended = true;
- if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
- || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
- isEndMiss = true;
- }
- }
- slaCalc.setActualStart(wf.getStartTime());
- slaCalc.setActualEnd(wf.getEndTime());
- slaCalc.setJobStatus(wf.getStatusStr());
- break;
- case WORKFLOW_ACTION:
- WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
- if (wa.getEndTime() != null) {
- ended = true;
- if (wa.isTerminalWithFailure()
- || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
- isEndMiss = true;
- }
- }
- slaCalc.setActualStart(wa.getStartTime());
- slaCalc.setActualEnd(wa.getEndTime());
- slaCalc.setJobStatus(wa.getStatusStr());
- break;
- case COORDINATOR_ACTION:
- CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
- if (ca.isTerminalWithFailure()) {
- isEndMiss = ended = true;
- slaCalc.setActualEnd(ca.getLastModifiedTime());
- }
- if (ca.getExternalId() != null) {
- wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
- if (wf.getEndTime() != null) {
- ended = true;
- if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
- isEndMiss = true;
- }
- }
- slaCalc.setActualEnd(wf.getEndTime());
- slaCalc.setActualStart(wf.getStartTime());
- }
- slaCalc.setJobStatus(ca.getStatusStr());
- break;
- default:
- LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType());
- }
-
- byte eventProc = slaCalc.getEventProcessed();
- if (ended) {
- if (isEndMiss) {
- slaCalc.setSLAStatus(SLAStatus.MISS);
- }
- else {
- slaCalc.setSLAStatus(SLAStatus.MET);
- }
- if (slaCalc.getActualStart() != null) {
- if ((eventProc & 1) == 0) {
- if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
- slaCalc.setEventStatus(EventStatus.START_MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.START_MET);
- }
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- }
- slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
- if (((eventProc >> 1) & 1) == 0) {
- processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
- }
- }
- if (eventProc < 4) {
- if (isEndMiss) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.END_MET);
- }
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- }
- slaCalc.setEventProcessed(8);
- }
- else {
- if (slaCalc.getActualStart() != null) {
- slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
- }
- if ((eventProc & 1) == 0) {
- if (slaCalc.getActualStart() != null) {
- if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
- slaCalc.setEventStatus(EventStatus.START_MISS);
- }
- else {
- slaCalc.setEventStatus(EventStatus.START_MET);
- }
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- eventProc++;
- }
- else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
- slaCalc.setEventStatus(EventStatus.START_MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- eventProc++;
- }
- }
- if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null
- && slaCalc.getExpectedDuration() != -1) {
- if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- eventProc += 2;
- }
- }
- if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- eventProc += 4;
- }
- slaCalc.setEventProcessed(eventProc);
- }
+ private void checkEventProc(SLACalcStatus slaCalc){
+ byte eventProc = slaCalc.getEventProcessed();
+ if (slaCalc.getEventProcessed() >= 8) {
+ slaMap.remove(slaCalc.getId());
+ LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId());
}
- catch (Exception e) {
- LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is "
- + e.getClass().getName() + ": " + e.getMessage());
- if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
- if (shouldAlert(slaCalc)) {
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
- }
+ if (eventProc == 7) {
+ historySet.add(slaCalc.getId());
+ slaMap.remove(slaCalc.getId());
+ LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId());
}
}
@@ -1235,21 +575,21 @@ public class SLACalculatorMemory implements SLACalculator {
return enableAlert(getSLAJobsforParents(parentJobIds));
}
-
@Override
public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
boolean isJobFound = false;
@SuppressWarnings("rawtypes")
List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
- for (String jobId : jobIds) {
- SLACalcStatus slaCalc = getSLACalcStatus(jobId);
- if (slaCalc != null) {
- slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true));
- updateDBSlaConfig(slaCalc, updateList);
- isJobFound = true;
- }
+ for (String jobId : jobIds) {
+ SLACalcStatus slaCalc = getSLACalcStatus(jobId);
+ if (slaCalc != null) {
+ slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
+ Boolean.toString(true));
+ updateDBSlaConfig(slaCalc, updateList);
+ isJobFound = true;
}
+ }
executeBatchQuery(updateList);
return isJobFound;
}
@@ -1260,19 +600,19 @@ public class SLACalculatorMemory implements SLACalculator {
}
@Override
- public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException,
- ServiceException{
+ public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException,
+ ServiceException {
boolean isJobFound = false;
@SuppressWarnings("rawtypes")
List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
- for (Pair<String, Map<String,String>> jobIdSLAPair : jobIdsSLAPair) {
- SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist());
- if (slaCalc != null) {
- updateParams(slaCalc, jobIdSLAPair.getSecond());
- updateDBSlaExpectedValues(slaCalc, updateList);
- isJobFound = true;
- }
+ for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) {
+ SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist());
+ if (slaCalc != null) {
+ updateParams(slaCalc, jobIdSLAPair.getSecond());
+ updateDBSlaExpectedValues(slaCalc, updateList);
+ isJobFound = true;
}
+ }
executeBatchQuery(updateList);
return isJobFound;
}
@@ -1292,11 +632,7 @@ public class SLACalculatorMemory implements SLACalculator {
}
}
- private boolean shouldAlert(SLACalcStatus slaObj) {
- return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
- }
-
- private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException{
+ private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException {
List<String> childJobIds = new ArrayList<String>();
for (String jobId : parentJobIds) {
List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList(
@@ -1307,4 +643,5 @@ public class SLACalculatorMemory implements SLACalculator {
}
return childJobIds;
}
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
index ef1ea98..3b2cebd 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
@@ -52,8 +52,6 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
- @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
-
@NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES", query = "update SLASummaryBean w set w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration , w.lastModifiedTS = :lastModTime where w.jobId = :jobId"),
@NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = :jobId"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
new file mode 100644
index 0000000..e6298f5
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.sla;
+
+import org.apache.oozie.command.sla.SLACoordActionJobEventXCommand;
+import org.apache.oozie.command.sla.SLACoordActionJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAJobEventXCommand;
+import org.apache.oozie.command.sla.SLAJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowActionJobEventXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowActionJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowJobEventXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowJobHistoryXCommand;
+
+/**
+ * A factory for creating SLACommand objects.
+ */
+public class SLAXCommandFactory {
+
+ /**
+ * Gets the SLA job history command.
+ *
+ * @param jobId the job id
+ * @return the SLA job history x command
+ */
+ public static SLAJobHistoryXCommand getSLAJobHistoryXCommand(String jobId) {
+ if (jobId.endsWith("-W")) {
+ return new SLAWorkflowJobHistoryXCommand(jobId);
+ }
+ else if (jobId.contains("-W@")) {
+ return new SLAWorkflowActionJobHistoryXCommand(jobId);
+
+ }
+ else if (jobId.contains("-C@")) {
+ return new SLACoordActionJobHistoryXCommand(jobId);
+ }
+
+ else {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the SLAevent command.
+ *
+ * @param slaCalc the sla calc
+ * @return the SLA event x command
+ */
+ public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus slaCalc) {
+ return getSLAEventXCommand(slaCalc, 0);
+ }
+
+ /**
+ * Gets the SLA event x command.
+ *
+ * @param slaCalc the sla calc
+ * @param lockTimeOut the lock time out
+ * @return the SLA event x command
+ */
+ public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ if (slaCalc.getId().endsWith("-W")) {
+ return new SLAWorkflowJobEventXCommand(slaCalc, lockTimeOut);
+ }
+ else if (slaCalc.getId().contains("-W@")) {
+ return new SLAWorkflowActionJobEventXCommand(slaCalc, lockTimeOut);
+
+ }
+ else if (slaCalc.getId().contains("-C@")) {
+ return new SLACoordActionJobEventXCommand(slaCalc, lockTimeOut);
+ }
+
+ else {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
index 52560e6..e239084 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
@@ -19,6 +19,7 @@
package org.apache.oozie.command.coord;
import java.util.Date;
+
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.WorkflowJobBean;
@@ -28,9 +29,10 @@ import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
@@ -84,7 +86,9 @@ public class TestCoordActionsKillXCommand extends XDataTestCase {
assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
sleep(100);
- WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ids[3]));
+ WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+ ids[3]);
+
assertEquals(WorkflowJob.Status.KILLED, wf.getStatus());
CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(ids[0]));
@@ -118,7 +122,8 @@ public class TestCoordActionsKillXCommand extends XDataTestCase {
assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
sleep(100);
- WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ids[3]));
+ WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+ ids[3]);
assertEquals(WorkflowJob.Status.KILLED, wf.getStatus());
CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(ids[0]));
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
index 5914b3b..1daecdc 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
@@ -131,7 +131,7 @@ public class TestSLASummaryQueryExecutor extends XDataTestCase {
bean.setActualDuration(endTime.getTime() - startTime.getTime());
bean.setLastModifiedTime(new Date());
bean.setEventProcessed(8);
- SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, bean);
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, bean);
retBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, bean.getId());
assertEquals(bean.getActualStartTimestamp(), retBean.getActualStartTimestamp());
assertEquals(bean.getActualEndTimestamp(), retBean.getActualEndTimestamp());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
index 795db37..3af263e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -101,17 +101,18 @@ public class TestHASLAService extends ZKXTestCase {
// Case 1 workflow job submitted to dummy server,
// but before start running, the dummy server is down
- createWorkflow("job-1");
- SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ WorkflowJobBean wfJob1 = createWorkflow("job-1-W");
+ SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
sla1.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2 hr before
sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); // 1 hr before
sla1.setExpectedDuration(10 * 60 * 1000); // 10 mins
dummyCalc.addRegistration(sla1.getId(), sla1);
+ dummyCalc.updateAllSlaStatus();
// Case 2. workflow job submitted to dummy server, start running,
// then the dummy server is down
- createWorkflow("job-2");
- SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+ WorkflowJobBean wfJob2 = createWorkflow("job-2-W");
+ SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2-W", AppType.WORKFLOW_JOB);
sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2hr before
sla2.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); // 1hr ahead
sla2.setExpectedDuration(10 * 60 * 1000); // 10 mins
@@ -138,6 +139,12 @@ public class TestHASLAService extends ZKXTestCase {
ehs.new EventWorker().run();
assertTrue(output.toString().contains(sla1.getId() + " Sla START - MISS!!!"));
+ wfJob1.setStatus(WorkflowJob.Status.SUCCEEDED);
+ wfJob1.setEndTime(new Date());
+ wfJob1.setStartTime(new Date());
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob1);
+
// Job 1 succeeded on the living server --> duration met and end miss
slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
new Date());
@@ -145,6 +152,12 @@ public class TestHASLAService extends ZKXTestCase {
assertTrue(output.toString().contains(sla1.getId() + " Sla DURATION - MET!!!"));
assertTrue(output.toString().contains(sla1.getId() + " Sla END - MISS!!!"));
+ wfJob2.setStatus(WorkflowJob.Status.SUCCEEDED);
+ wfJob2.setEndTime(new Date());
+ wfJob2.setStartTime(new Date());
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2);
+
// Job 2 succeeded on the living server --> duration met and end met
slaCalcMem.addJobStatus(sla2.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
new Date());
@@ -161,13 +174,14 @@ public class TestHASLAService extends ZKXTestCase {
}
}
- public void updateCoordAction(String id, String status) throws JPAExecutorException {
+ public CoordinatorActionBean updateCoordAction(String id, String status) throws JPAExecutorException {
CoordinatorActionBean action = new CoordinatorActionBean();
action.setId(id);
action.setStatusStr(status);
action.setLastModifiedTime(new Date());
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
action);
+ return action;
}
public void testSLAUpdateWithHA() throws Exception {
@@ -187,8 +201,8 @@ public class TestHASLAService extends ZKXTestCase {
createDBEntry(id3, expectedStartTS, expectedEndTS1);
createDBEntry(id4, expectedStartTS, expectedEndTS1);
// Coord Action of jobs 5-6 already started and currently running (to test history set)
- createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2);
- createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2);
+ createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2, 1);
+ createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2, 1);
SLAService slas = Services.get().get(SLAService.class);
SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
@@ -198,7 +212,10 @@ public class TestHASLAService extends ZKXTestCase {
while (itr.hasNext()) {
slaMapKeys.add(itr.next());
}
- assertEquals(6, slaMapKeys.size());
+ //4 jobs expected end is not yet reached
+ //2 jobs has end miss, waiting for job to complete
+ assertEquals(4, slaMapKeys.size());
+ assertEquals(2, slaCalcMem.getHistorySet().size());
DummyZKOozie dummyOozie_1 = null;
try {
@@ -214,8 +231,8 @@ public class TestHASLAService extends ZKXTestCase {
while (itr.hasNext()) {
slaMapKeys.add(itr.next());
}
- assertEquals(6, slaMapKeys.size());
-
+ assertEquals(4, slaMapKeys.size());
+ assertEquals(2, dummySlaCalcMem.getHistorySet().size());
// Coord Action 1,3 run and update status on *non-dummy* server
updateCoordAction(id1, "RUNNING");
slaCalcMem
@@ -314,14 +331,12 @@ public class TestHASLAService extends ZKXTestCase {
public void testNoDuplicateEventsInHA() throws Exception {
String id1 = "0000001-130521183438837-oozie-test-C@1";
- Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // get MISS
- Date expectedEndTS = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // get MISS
- createDBEntry(id1, expectedStartTS, expectedEndTS);
SLAService slas = Services.get().get(SLAService.class);
SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
slaCalcMem.init(Services.get().getConf()); // loads the job in sla map
+
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
EventQueue ehs_q = ehs.getEventQueue();
@@ -336,20 +351,30 @@ public class TestHASLAService extends ZKXTestCase {
dummyEhs.init(Services.get());
EventQueue dummyEhs_q = dummyEhs.getEventQueue();
+ Date expectedStartTS = new Date(System.currentTimeMillis() + 2 * 3600 * 1000); // get MISS
+ Date expectedEndTS = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // get MISS
+ SLASummaryBean sla = createDBEntryForStarted(id1, expectedStartTS, expectedEndTS, 0);
+ sla.setExpectedDuration(-1);
+ sla.setLastModifiedTime(new Date());
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
+ sla);
+
// Action started on Server 1
updateCoordAction(id1, "RUNNING");
+
slaCalcMem
.addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
+ assertEquals(1, ehs_q.size());
SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll();
- assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
+ assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
// Action ended on Server 2
updateCoordAction(id1, "FAILED");
dummySlaCalcMem.addJobStatus(id1, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date(
System.currentTimeMillis() - 1800 * 1000),
new Date());
- dummyEhs_q.poll(); // getting rid of the duration_miss event
SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll();
+
assertEquals(SLAStatus.MISS, s2.getSLAStatus());
slaCalcMem.updateAllSlaStatus();
@@ -464,7 +489,8 @@ public class TestHASLAService extends ZKXTestCase {
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
}
- private void createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception {
+ private SLASummaryBean createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS,
+ int eventProcessed) throws Exception {
ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
Date modTime = new Date();
WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
@@ -494,7 +520,7 @@ public class TestHASLAService extends ZKXTestCase {
sla.setAppType(AppType.COORDINATOR_ACTION);
sla.setJobStatus("RUNNING");
sla.setSLAStatus(SLAStatus.IN_PROCESS);
- sla.setEventProcessed(1);
+ sla.setEventProcessed(eventProcessed);
sla.setLastModifiedTime(modTime);
sla.setExpectedStart(expectedStartTS);
sla.setActualStart(expectedStartTS);
@@ -508,9 +534,10 @@ public class TestHASLAService extends ZKXTestCase {
insertList.add(sla);
insertList.add(reg);
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ return sla;
}
- private void createWorkflow(String id) throws Exception {
+ private WorkflowJobBean createWorkflow(String id) throws Exception {
List<JsonBean> insertList = new ArrayList<JsonBean>();
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setId(id);
@@ -519,6 +546,7 @@ public class TestHASLAService extends ZKXTestCase {
workflow.setSlaXml("<sla></sla>");
insertList.add(workflow);
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ return workflow;
}
public static class DummySLAEventListener extends SLAEventListener {