You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ry...@apache.org on 2014/06/18 09:19:58 UTC
[1/2] OOZIE-1678 HA support for SLA (ryota)
Repository: oozie
Updated Branches:
refs/heads/master d5f1e3864 -> 4e015d45e
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
index 2e170a4..fe7fa97 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
@@ -21,7 +21,9 @@ import java.util.Date;
import javax.persistence.EntityManager;
import javax.persistence.Query;
+
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -49,6 +51,19 @@ public class TestSLASummaryQueryExecutor extends XDataTestCase {
public void testGetQuery() throws Exception {
EntityManager em = jpaService.getEntityManager();
SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+ // GET_SLA_SUMMARY
+ Query query = SLASummaryQueryExecutor.getInstance().getSelectQuery(SLASummaryQuery.GET_SLA_SUMMARY, em,
+ bean.getId());
+ assertEquals(query.getParameterValue("id"), bean.getId());
+ // GET_SLA_SUMMARY_EVENTPROCESSED
+ query = SLASummaryQueryExecutor.getInstance().getSelectQuery(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED,
+ em, bean.getId());
+ assertEquals(query.getParameterValue("id"), bean.getId());
+ }
+
+ public void testUpdateQuery() throws Exception {
+ EntityManager em = jpaService.getEntityManager();
+ SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
// UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES
Query query = SLASummaryQueryExecutor.getInstance().getUpdateQuery(
@@ -107,12 +122,38 @@ public class TestSLASummaryQueryExecutor extends XDataTestCase {
assertEquals(bean.getActualEndTimestamp(), retBean.getActualEndTimestamp());
assertEquals(SLAStatus.MET, retBean.getSLAStatus());
assertEquals(createdTime, retBean.getCreatedTime()); // Created time should not be updated
+
+ //test UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES
+ bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+ bean.setActualStart(startTime);
+ bean.setActualStart(endTime);
+ bean.setActualDuration(endTime.getTime() - startTime.getTime());
+ bean.setLastModifiedTime(new Date());
+ bean.setEventProcessed(8);
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, bean);
+ retBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, bean.getId());
+ assertEquals(bean.getActualStartTimestamp(), retBean.getActualStartTimestamp());
+ assertEquals(bean.getActualEndTimestamp(), retBean.getActualEndTimestamp());
+ assertEquals(bean.getActualDuration(), retBean.getActualDuration());
+ assertEquals(bean.getLastModifiedTimestamp(), retBean.getLastModifiedTimestamp());
+ assertEquals(bean.getEventProcessed(), retBean.getEventProcessed());
}
public void testGet() throws Exception {
- // TODO
+ SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+ //GET_SLA_REG_ON_RESTART
+ SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, bean.getId());
+ assertEquals(bean.getEventProcessed(), sBean.getEventProcessed());
}
+ public void testGetValue() throws Exception {
+ SLASummaryBean bean = addRecordToSLASummaryTable("test-sla-summary", SLAStatus.IN_PROCESS);
+ //GET_SLA_REG_ON_RESTART
+ Object ret = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, bean.getId());
+ assertEquals(bean.getEventProcessed(), ((Byte)ret).byteValue());
+ }
public void testGetList() throws Exception {
// TODO
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
index 9e0f03b..7a10685 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
@@ -202,6 +202,15 @@ public class TestWorkflowJobQueryExecutor extends XDataTestCase {
assertNull(retBean.getProtoActionConf());
assertNull(retBean.getSlaXml());
assertNull(retBean.getConf());
+ // GET_WORKFLOW_START_END_TIME
+ retBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, bean.getId());
+ assertEquals(bean.getId(), retBean.getId());
+ assertEquals(bean.getStartTime().getTime(), retBean.getStartTime().getTime());
+ assertEquals(bean.getEndTime().getTime(), retBean.getEndTime().getTime());
+ assertNull(retBean.getWorkflowInstance());
+ assertNull(retBean.getProtoActionConf());
+ assertNull(retBean.getSlaXml());
+ assertNull(retBean.getConf());
// GET_WORKFLOW_USER_GROUP
retBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_USER_GROUP, bean.getId());
assertEquals(bean.getUser(), retBean.getUser());
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
index ea36720..7257433 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
@@ -60,7 +60,6 @@ public class TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor ext
days = 1;
assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
- System.out.println("Debug: days " + days);
assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
WorkflowJobBean subwfJob2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED, wfJobId);
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
new file mode 100644
index 0000000..eec5369
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLACalculatorMemory;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.sla.SLASummaryBean;
+import org.apache.oozie.sla.TestSLAService;
+import org.apache.oozie.sla.listener.SLAEventListener;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+public class TestHASLAService extends ZKXTestCase {
+
+ private static StringBuilder output = new StringBuilder();
+
+ protected void setUp() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService,"
+ + "org.apache.oozie.sla.service.SLAService");
+ conf.setClass(EventHandlerService.CONF_LISTENERS, DummySLAEventListener.class, SLAEventListener.class);
+ conf.setLong(SLAService.CONF_JOB_EVENT_LATENCY, 0);
+ // manually do check in this test
+ conf.setInt(SLAService.CONF_SLA_CHECK_INITIAL_DELAY, 100000);
+ conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 100000);
+ conf.setInt(EventHandlerService.CONF_WORKER_THREADS, 0);
+ super.setUp(conf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testSLAFailOverWithHA() throws Exception {
+
+ SLAService slas = Services.get().get(SLAService.class);
+ SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+
+ // start another dummy oozie instance (dummy sla and eventhandler
+ // services)
+ DummyZKOozie dummyOozie_1 = null;
+ try {
+ dummyOozie_1 = new DummyZKOozie("a", "http://blah");
+ DummySLACalculatorMemory dummyCalc = new DummySLACalculatorMemory();
+ EventHandlerService dummyEhs = new EventHandlerService();
+ dummyCalc.setEventHandlerService(dummyEhs);
+ dummyEhs.init(Services.get());
+ dummyCalc.init(Services.get().getConf());
+
+ // Case 1 workflow job submitted to dummy server,
+ // but before start running, the dummy server is down
+ createWorkflow("job-1");
+ SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ sla1.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2 hr before
+ sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); // 1 hr before
+ sla1.setExpectedDuration(10 * 60 * 1000); // 10 mins
+ dummyCalc.addRegistration(sla1.getId(), sla1);
+
+ // Case 2. workflow job submitted to dummy server, start running,
+ // then the dummy server is down
+ createWorkflow("job-2");
+ SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+ sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2hr before
+ sla2.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); // 1hr ahead
+ sla2.setExpectedDuration(10 * 60 * 1000); // 10 mins
+ dummyCalc.addRegistration(sla2.getId(), sla2);
+ dummyCalc.addJobStatus(sla2.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+ new Date());
+
+ dummyCalc.updateAllSlaStatus();
+ dummyEhs.new EventWorker().run();
+ assertTrue(output.toString().contains(sla2.getId() + " Sla START - MISS!!!"));
+
+ // suppose dummy Server is down
+ dummyCalc.clear();
+ dummyCalc = null;
+ dummyOozie_1.teardown();
+
+ slaCalcMem.updateAllSlaStatus();
+
+ // Job 1 started running on the living server --> start miss
+ slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+ new Date());
+ // job 1 is added to slamap of living oozie server
+ assertNotNull(slaCalcMem.get(sla1.getId()));
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains(sla1.getId() + " Sla START - MISS!!!"));
+
+ // Job 1 succeeded on the living server --> duration met and end miss
+ slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
+ new Date());
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains(sla1.getId() + " Sla DURATION - MET!!!"));
+ assertTrue(output.toString().contains(sla1.getId() + " Sla END - MISS!!!"));
+
+ // Job 2 succeeded on the living server --> duration met and end met
+ slaCalcMem.addJobStatus(sla2.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
+ new Date());
+ // eventProc >= 7(already processed duration/end met), should be removed from slaMap
+ assertNull(slaCalcMem.get(sla2.getId()));
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains(sla2.getId() + " Sla DURATION - MET!!!"));
+ assertTrue(output.toString().contains(sla2.getId() + " Sla END - MET!!!"));
+ }
+ finally {
+ if (dummyOozie_1 != null) {
+ dummyOozie_1.teardown();
+ }
+ }
+ }
+
+ public void updateCoordAction(String id, String status) throws JPAExecutorException {
+ CoordinatorActionBean action = new CoordinatorActionBean();
+ action.setId(id);
+ action.setStatusStr(status);
+ action.setLastModifiedTime(new Date());
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
+ action);
+ }
+
+ public void testSLAUpdateWithHA() throws Exception {
+
+ String id1 = "0000000-130521183438837-oozie-test-C@1";
+ String id2 = "0000001-130521183438837-oozie-test-C@1";
+ String id3 = "0000002-130521183438837-oozie-test-C@1";
+ String id4 = "0000003-130521183438837-oozie-test-C@1";
+ String id5 = "0000004-130521183438837-oozie-test-C@1";
+ String id6 = "0000005-130521183438837-oozie-test-C@1";
+ Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000);
+ Date expectedEndTS1 = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // 1 hour ahead
+ Date expectedEndTS2 = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // 1 hour passed
+ // Coord Action 1-4 not started yet
+ createDBEntry(id1, expectedStartTS, expectedEndTS1);
+ createDBEntry(id2, expectedStartTS, expectedEndTS1);
+ createDBEntry(id3, expectedStartTS, expectedEndTS1);
+ createDBEntry(id4, expectedStartTS, expectedEndTS1);
+ // Coord Action 5-6 already started and currently running (to test history set)
+ createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2);
+ createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2);
+
+ SLAService slas = Services.get().get(SLAService.class);
+ SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator();
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ slaCalcMem.init(Services.get().getConf());
+ List<String> slaMapKeys = new ArrayList<String>();
+ Iterator<String> itr = slaCalcMem.iterator();
+ while (itr.hasNext()) {
+ slaMapKeys.add(itr.next());
+ }
+ assertEquals(6, slaMapKeys.size());
+
+ DummyZKOozie dummyOozie_1 = null;
+ try {
+ // start another dummy oozie instance (dummy sla and event handler services)
+ dummyOozie_1 = new DummyZKOozie("a", "http://blah");
+ DummySLACalculatorMemory dummySlaCalcMem = new DummySLACalculatorMemory();
+ EventHandlerService dummyEhs = new EventHandlerService();
+ dummySlaCalcMem.setEventHandlerService(dummyEhs);
+ dummyEhs.init(Services.get());
+ dummySlaCalcMem.init(Services.get().getConf());
+ slaMapKeys = new ArrayList<String>();
+ itr = dummySlaCalcMem.iterator();
+ while (itr.hasNext()) {
+ slaMapKeys.add(itr.next());
+ }
+ assertEquals(6, slaMapKeys.size());
+
+ // Coord Action 1,3 run and update status on non-dummy server
+ updateCoordAction(id1, "RUNNING");
+ slaCalcMem
+ .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
+ updateCoordAction(id3, "FAILED");
+ slaCalcMem.addJobStatus(id3, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null, new Date());
+
+ // Coord Action 2,4 run and update status on dummy server
+ updateCoordAction(id2, "RUNNING");
+ dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(),
+ null);
+ updateCoordAction(id4, "FAILED");
+ dummySlaCalcMem.addJobStatus(id4, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null,
+ new Date());
+
+ // Both servers iterate SlaMap (updateAllSlaStatus)
+ slaCalcMem.updateAllSlaStatus();
+ dummySlaCalcMem.updateAllSlaStatus();
+
+ // SlaMap on both Servers synced
+ SLACalcStatus sla1_nodummy = slaCalcMem.get(id1);
+ SLACalcStatus sla1_dummy = dummySlaCalcMem.get(id1);
+ SLACalcStatus sla2_nodummy = slaCalcMem.get(id2);
+ SLACalcStatus sla2_dummy = dummySlaCalcMem.get(id2);
+ assertEquals(1, sla1_nodummy.getEventProcessed());
+ assertEquals(1, sla1_dummy.getEventProcessed());
+ assertEquals(1, sla2_dummy.getEventProcessed());
+ assertEquals(1, sla2_nodummy.getEventProcessed());
+ assertFalse(slaCalcMem.isJobIdInSLAMap(id3));
+ assertFalse(dummySlaCalcMem.isJobIdInSLAMap(id3));
+ assertFalse(slaCalcMem.isJobIdInSLAMap(id4));
+ assertFalse(dummySlaCalcMem.isJobIdInSLAMap(id4));
+
+ Byte eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id3);
+ assertEquals(8, eventProc.intValue());
+ eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id4);
+ assertEquals(8, eventProc.intValue());
+
+ // Action 5 was processed as END_MISS in updateAllSlaStatus, put into history set
+ assertTrue(slaCalcMem.isJobIdInHistorySet(id5));
+ assertTrue(dummySlaCalcMem.isJobIdInHistorySet(id6));
+ // Action 6 was processed as END_MISS in updateAllSlaStatus, put into history set
+ assertTrue(slaCalcMem.isJobIdInHistorySet(id5));
+ assertTrue(dummySlaCalcMem.isJobIdInHistorySet(id6));
+
+ eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id5);
+ assertEquals(7, eventProc.intValue());
+ eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id6);
+ assertEquals(7, eventProc.intValue());
+
+ // Action 1 Succeeded on non-dummy server
+ updateCoordAction(id1, "SUCCEEDED");
+ slaCalcMem.addJobStatus(id1, CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(
+ System.currentTimeMillis() - 1800 * 1000), new Date());
+
+ // Action 2 Succeeded on dummy server
+ updateCoordAction(id2, "SUCCEEDED");
+ dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(
+ System.currentTimeMillis() - 1800 * 1000), new Date());
+
+ // Both servers iterate SlaMap (updateAllSlaStatus)
+ slaCalcMem.updateAllSlaStatus();
+ dummySlaCalcMem.updateAllSlaStatus();
+
+ // Action 1, 2 are removed from both servers
+ assertNull(slaCalcMem.get(id1));
+ assertNull(dummySlaCalcMem.get(id1));
+ assertNull(slaCalcMem.get(id2));
+ assertNull(dummySlaCalcMem.get(id2));
+ eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id1);
+ assertEquals(8, eventProc.intValue());
+ eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id2);
+ assertEquals(8, eventProc.intValue());
+
+ // Test HistoryPurgeWorker purges Action 5,6 from history set
+ updateCoordAction(id5, "SUCCEEDED");
+ slaCalcMem.new HistoryPurgeWorker().run();
+ assertFalse(slaCalcMem.isJobIdInHistorySet(id5));
+ updateCoordAction(id6, "SUCCEEDED");
+ dummySlaCalcMem.new HistoryPurgeWorker().run();
+ assertFalse(slaCalcMem.isJobIdInHistorySet(id6));
+
+ }
+ finally {
+ if (dummyOozie_1 != null) {
+ dummyOozie_1.teardown();
+ }
+ }
+ }
+
+ private void createDBEntry(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception {
+ ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
+ CoordinatorActionBean coordAction = new CoordinatorActionBean();
+ Date modTime = new Date(System.currentTimeMillis() - 1000 * 3600 * 2);
+ coordAction.setId(actionId);
+ coordAction.setJobId(actionId.split("@", -1)[0]);
+ coordAction.setStatusStr("WAITING");
+ coordAction.setLastModifiedTime(modTime);
+
+ CoordinatorJobBean coordJob = new CoordinatorJobBean();
+ coordJob.setId(actionId.split("@", -1)[0]);
+ coordJob.setUser("dummy");
+ coordJob.setAppName("dummy");
+ coordJob.setStatusStr("RUNNING");
+ coordJob.setAppNamespace("dummy");
+
+ SLASummaryBean sla = new SLASummaryBean();
+ sla.setId(actionId);
+ sla.setAppType(AppType.COORDINATOR_ACTION);
+ sla.setJobStatus("WAITING");
+ sla.setSLAStatus(SLAStatus.NOT_STARTED);
+ sla.setEventProcessed(0);
+ sla.setLastModifiedTime(modTime);
+ sla.setExpectedStart(expectedStartTS);
+ sla.setExpectedEnd(expectedEndTS);
+ sla.setExpectedDuration(10 * 60 * 1000);
+ SLARegistrationBean reg = new SLARegistrationBean();
+ reg.setId(actionId);
+ insertList.add(coordAction);
+ insertList.add(coordJob);
+ insertList.add(sla);
+ insertList.add(reg);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ }
+
+ private void createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception {
+ ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
+ Date modTime = new Date();
+ WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
+ actionId);
+ wf.setStatus(wf.getStatus());
+ wf.setStartTime(expectedStartTS);
+ wf.setLastModifiedTime(modTime);
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(
+ WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wf);
+
+ CoordinatorActionBean coordAction = new CoordinatorActionBean();
+ coordAction.setId(actionId);
+ coordAction.setJobId(actionId.split("@", -1)[0]);
+ coordAction.setStatusStr("RUNNING");
+ coordAction.setLastModifiedTime(modTime);
+ coordAction.setExternalId(wf.getId());
+
+ CoordinatorJobBean coordJob = new CoordinatorJobBean();
+ coordJob.setId(actionId.split("@", -1)[0]);
+ coordJob.setUser("dummy");
+ coordJob.setAppName("dummy");
+ coordJob.setStatusStr("RUNNING");
+ coordJob.setAppNamespace("dummy");
+
+ SLASummaryBean sla = new SLASummaryBean();
+ sla.setId(actionId);
+ sla.setAppType(AppType.COORDINATOR_ACTION);
+ sla.setJobStatus("RUNNING");
+ sla.setSLAStatus(SLAStatus.IN_PROCESS);
+ sla.setEventProcessed(1);
+ sla.setLastModifiedTime(modTime);
+ sla.setExpectedStart(expectedStartTS);
+ sla.setActualStart(expectedStartTS);
+ sla.setExpectedEnd(expectedEndTS);
+ sla.setExpectedDuration(10 * 60 * 1000);
+ SLARegistrationBean reg = new SLARegistrationBean();
+ reg.setId(actionId);
+ insertList.add(coordAction);
+
+ insertList.add(coordJob);
+ insertList.add(sla);
+ insertList.add(reg);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ }
+
+ private void createWorkflow(String id) throws Exception {
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ WorkflowJobBean workflow = new WorkflowJobBean();
+ workflow.setId(id);
+ workflow.setStatusStr("PREP");
+ workflow.setStartTime(new Date());
+ workflow.setSlaXml("<sla></sla>");
+ insertList.add(workflow);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ }
+
+ public static class DummySLAEventListener extends SLAEventListener {
+
+ @Override
+ public void onStartMet(SLAEvent sla) {
+ output.append(sla.getId() + " Sla START - MET!!!");
+ }
+
+ @Override
+ public void onStartMiss(SLAEvent sla) {
+ output.append(sla.getId() + " Sla START - MISS!!!");
+ }
+
+ @Override
+ public void onEndMet(SLAEvent sla) {
+ output.append(sla.getId() + " Sla END - MET!!!");
+ }
+
+ @Override
+ public void onEndMiss(SLAEvent sla) {
+ output.append(sla.getId() + " Sla END - MISS!!!");
+ }
+
+ @Override
+ public void onDurationMet(SLAEvent sla) {
+ output.append(sla.getId() + " Sla DURATION - MET!!!");
+ }
+
+ @Override
+ public void onDurationMiss(SLAEvent sla) {
+ output.append(sla.getId() + " Sla DURATION - MISS!!!");
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+}
+
+class DummySLACalculatorMemory extends SLACalculatorMemory {
+ public void setEventHandlerService(EventHandlerService ehs) {
+ this.eventHandler = ehs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
index 9270aa2..2ae35ad 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
@@ -31,7 +31,6 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
@@ -88,9 +87,7 @@ public class TestSLACalculationJPAExecutor extends XDataTestCase {
List<JsonBean> insertList = new ArrayList<JsonBean>();
insertList.add(bean2);
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
-
- SLASummaryGetJPAExecutor readCmd2 = new SLASummaryGetJPAExecutor(wfId);
- SLASummaryBean sBean = jpaService.execute(readCmd2);
+ SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId);
assertEquals(wfId, sBean.getId());
assertEquals("RUNNING", sBean.getJobStatus());
assertEquals(EventStatus.START_MISS, sBean.getEventStatus());
@@ -136,8 +133,7 @@ public class TestSLACalculationJPAExecutor extends XDataTestCase {
List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, bean2);
- SLASummaryGetJPAExecutor readCmd2 = new SLASummaryGetJPAExecutor(wfId);
- SLASummaryBean sBean = jpaService.execute(readCmd2);
+ SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId);
// check updated + original fields
assertEquals(wfId, sBean.getId());
assertEquals(EventStatus.DURATION_MISS, sBean.getEventStatus());
@@ -195,14 +191,13 @@ public class TestSLACalculationJPAExecutor extends XDataTestCase {
FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
// Check whether transactions are rolled back or not
- SLASummaryGetJPAExecutor readCmd = new SLASummaryGetJPAExecutor(wfId1);
- SLASummaryBean sBean = jpaService.execute(readCmd);
+ SLASummaryBean sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId1);
+
// isSlaProcessed should NOT be changed to 1
// actualEnd should be null as before
assertNull(sBean.getActualEnd());
- SLASummaryGetJPAExecutor readCmd1 = new SLASummaryGetJPAExecutor(wfId2);
- sBean = jpaService.execute(readCmd1);
+ sBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, wfId2);
assertNull(sBean); //new bean should not have been inserted due to rollback
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index 9a16722..438f2c2 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -20,7 +20,9 @@ package org.apache.oozie.sla;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
@@ -33,16 +35,17 @@ import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -75,6 +78,18 @@ public class TestSLACalculatorMemory extends XDataTestCase {
super.tearDown();
}
+ private void createWorkflow(List<String> idList) throws Exception {
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+ for (String id : idList) {
+ WorkflowJobBean workflow = new WorkflowJobBean();
+ workflow.setId(id);
+ workflow.setStatusStr("PREP");
+ workflow.setStartTime(new Date());
+ insertList.add(workflow);
+ }
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+ }
+
@Test
public void testLoadOnRestart() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
@@ -85,6 +100,11 @@ public class TestSLACalculatorMemory extends XDataTestCase {
String jobId2 = slaRegBean2.getId();
SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB);
String jobId3 = slaRegBean3.getId();
+ List<String> idList = new ArrayList<String>();
+ idList.add(slaRegBean1.getId());
+ idList.add(slaRegBean2.getId());
+ idList.add(slaRegBean3.getId());
+ createWorkflow(idList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
slaRegBean1.setAppName("app-name");
@@ -166,7 +186,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
assertEquals(2, slaCalcMemory.size()); // 2 out of 3 jobs in map
slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2011-03-09"), sdf.parse("2011-04-09"));
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId3));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId3);
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart());
assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd());
@@ -193,7 +213,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,slaSummaryBean);
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
// Simulate a lost success event
WorkflowJobBean wjb = new WorkflowJobBean();
@@ -209,7 +229,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("job-1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
@@ -237,7 +257,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory.init(new Configuration(false));
assertEquals(0, slaCalcMemory.size());
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("FAILED", slaSummary.getJobStatus());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
@@ -303,7 +323,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("job-1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
@@ -354,7 +374,8 @@ public class TestSLACalculatorMemory extends XDataTestCase {
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
+
assertEquals("job-1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
@@ -379,12 +400,12 @@ public class TestSLACalculatorMemory extends XDataTestCase {
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
assertEquals("PREP", slaSummary.getJobStatus());
slaCalcMemory.updateJobSla(jobId);
assertEquals(2, ehs.getEventQueue().size());
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// both start miss and end miss (101)
assertEquals(5, slaSummary.getEventProcessed());
assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
@@ -397,14 +418,14 @@ public class TestSLACalculatorMemory extends XDataTestCase {
sdf.parse("2012-01-01"), null);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND,
sdf.parse("2012-01-01"), null);
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus());
assertEquals(5, slaSummary.getEventProcessed());
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
assertEquals(3, ehs.getEventQueue().size());
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// All events processed and actual times stored (1000)
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
@@ -432,7 +453,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
slaCalcMemory.updateJobSla(jobId);
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Duration bit should be processed as expected duration is not set
assertEquals(3, slaSummary.getEventProcessed());
// check only start event in queue
@@ -446,7 +467,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// all should be processed
assertEquals(8, slaSummary.getEventProcessed());
// check only end event is in queue
@@ -466,7 +487,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
sdf.parse("2012-01-02"));
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Actual start null, so all events processed
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(1, ehs.getEventQueue().size());
@@ -491,13 +512,15 @@ public class TestSLACalculatorMemory extends XDataTestCase {
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
new Date(System.currentTimeMillis()), null);
slaCalcMemory.updateJobSla(jobId);
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus());
@@ -518,19 +541,20 @@ public class TestSLACalculatorMemory extends XDataTestCase {
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
- slaRegBean = jpaService.execute(new SLARegistrationGetJPAExecutor(jobId));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ slaRegBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
assertNotNull(slaRegBean.getCreatedTimestamp());
assertEquals(slaRegBean.getCreatedTimestamp(), slaSummary.getCreatedTimestamp());
// Only end sla should be processed (100)
assertEquals(4, slaSummary.getEventProcessed());
slaCalcMemory.updateJobSla(jobId);
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(4, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+
// Only Duration sla should be processed as end is already processed
// (110)
assertEquals(6, slaSummary.getEventProcessed());
@@ -538,7 +562,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
// Recieve start event
assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)));
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Start event received so all bits should be processed (111)
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
@@ -563,7 +587,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
System.currentTimeMillis() - 3600 * 1000), null);
slaCalcMemory.updateJobSla(jobId);
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// The actual end times are not stored, but sla's processed so (111)
assertEquals(7, slaSummary.getEventProcessed());
// Moved from map to history set
@@ -571,11 +595,10 @@ public class TestSLACalculatorMemory extends XDataTestCase {
// Add terminal state event so actual end time is stored
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(
System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis()));
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// The actual times are stored, so event processed(1000)
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(3, ehs.getEventQueue().size());
-
}
private SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
@@ -585,4 +608,31 @@ public class TestSLACalculatorMemory extends XDataTestCase {
return bean;
}
+ public void testHistoryPurge() throws Exception{
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
+ slaCalcMemory.init(new Configuration(false));
+ WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
+ Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
+ slaRegBean.setExpectedStart(startTime); // 1 hour back
+ slaRegBean.setExpectedDuration(1000);
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
+ String jobId = slaRegBean.getId();
+ slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
+ slaCalcMemory.updateJobSla(jobId);
+ slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
+ System.currentTimeMillis() - 3600 * 1000), null);
+ slaCalcMemory.updateJobSla(jobId);
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ // The actual end times are not stored, but sla's processed so (111)
+ assertEquals(7, slaSummary.getEventProcessed());
+ assertTrue(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
+ job1.setStatusStr("SUCCEEDED");
+ job1.setLastModifiedTime(new Date());
+ WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1);
+ slaCalcMemory.new HistoryPurgeWorker().run();
+ assertFalse(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
index cb4e434..31c2a47 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
@@ -53,11 +53,12 @@ import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -242,7 +243,7 @@ public class TestSLAEventGeneration extends XDataTestCase {
assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
// assert for values in summary bean to be reset
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals( 0, slaSummary.getEventProcessed());
assertEquals(-1, slaSummary.getActualDuration());
assertNull(slaSummary.getActualStart());
@@ -443,6 +444,10 @@ public class TestSLAEventGeneration extends XDataTestCase {
new CoordActionStartXCommand(actionId, getTestUser(), appName, jobId).call();
slaEvent = slas.getSLACalculator().get(actionId);
slaEvent.setEventProcessed(0); //resetting for testing sla event
+ SLASummaryBean suBean = new SLASummaryBean();
+ suBean.setId(actionId);
+ suBean.setEventProcessed(0);
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_EVENTPROCESSED, suBean);
ehs.new EventWorker().run();
slaEvent = skipToSLAEvent();
assertEquals(actionId, slaEvent.getId());
@@ -576,6 +581,10 @@ public class TestSLAEventGeneration extends XDataTestCase {
new StartXCommand(jobId).call();
slaEvent = slas.getSLACalculator().get(jobId);
slaEvent.setEventProcessed(0); //resetting to receive sla events
+ SLASummaryBean suBean = new SLASummaryBean();
+ suBean.setId(jobId);
+ suBean.setEventProcessed(0);
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_EVENTPROCESSED, suBean);
waitForEventGeneration(200);
ehs.new EventWorker().run();
waitForEventGeneration(300);
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
index 3ce86ab..b033d4d 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
@@ -31,9 +31,9 @@ import org.apache.oozie.event.CoordinatorJobEvent;
import org.apache.oozie.event.WorkflowActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.event.listener.JobEventListener;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.service.EventHandlerService;
-import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.listener.SLAJobEventListener;
import org.apache.oozie.sla.service.SLAService;
@@ -114,7 +114,7 @@ public class TestSLAJobEventListener extends XTestCase {
"coord-app-name1", actualStart, actualEnd);
listener.onCoordinatorJobEvent(cje);
- SLASummaryBean summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("cj1"));
+ SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1");
// check that end and duration sla has been calculated
assertEquals(6, summary.getEventProcessed());
@@ -130,7 +130,7 @@ public class TestSLAJobEventListener extends XTestCase {
cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1",
"coord-app-name1", null, actualEnd, null);
listener.onCoordinatorActionEvent(cae);
- summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("ca1"));
+ summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "ca1");
// check that all events are processed
assertEquals(8, summary.getEventProcessed());
assertEquals(EventStatus.END_MISS, summary.getEventStatus());
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
index 40376a5..1c65d12 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
@@ -25,7 +25,8 @@ import org.apache.oozie.AppType;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLARegistrationBean;
@@ -55,8 +56,7 @@ public class TestSLARegistrationGetJPAExecutor extends XDataTestCase {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- SLARegistrationGetJPAExecutor readCmd = new SLARegistrationGetJPAExecutor(jobId);
- SLARegistrationBean bean = jpaService.execute(readCmd);
+ SLARegistrationBean bean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
assertEquals(jobId, bean.getId());
assertEquals(AppType.WORKFLOW_JOB, bean.getAppType());
assertEquals(current, bean.getExpectedStart());
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
deleted file mode 100644
index cd2fbae..0000000
--- a/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.sla;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.oozie.client.rest.JsonBean;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.SLARegistrationBean;
-import org.apache.oozie.test.XDataTestCase;
-
-public class TestSLARegistrationGetRecordsOnRestartJPAExecutor extends XDataTestCase {
- Services services;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- services = new Services();
- services.init();
- }
-
- @Override
- protected void tearDown() throws Exception {
- services.destroy();
- super.tearDown();
- }
-
- public void testSLARegistrationGetRecordsOnRestart() throws Exception {
- Date current = new Date();
- final String jobId = "0000000-" + current.getTime() + "-TestSLARegGetRestartJPAExecutor-W";
- SLARegistrationBean reg = new SLARegistrationBean();
- reg.setId(jobId);
- reg.setNotificationMsg("dummyMessage");
- reg.setUpstreamApps("upApps");
- reg.setAlertEvents("miss");
- reg.setAlertContact("abc@y.com");
- reg.setJobData("jobData");
- JPAService jpaService = Services.get().get(JPAService.class);
- List<JsonBean> insert = new ArrayList<JsonBean>();
- insert.add(reg);
- BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insert, null, null);
- assertNotNull(jpaService);
- SLARegistrationGetOnRestartJPAExecutor readCmd = new SLARegistrationGetOnRestartJPAExecutor(jobId);
- SLARegistrationBean bean = jpaService.execute(readCmd);
- assertEquals("dummyMessage", bean.getNotificationMsg());
- assertEquals ("upApps", bean.getUpstreamApps());
- assertEquals ("miss", bean.getAlertEvents());
- assertEquals ("abc@y.com", bean.getAlertContact());
- assertEquals ("jobData", bean.getJobData());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
index 291d850..205bcd1 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
@@ -29,12 +29,14 @@ import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -244,7 +246,7 @@ public class TestSLAService extends XDataTestCase {
assertEventNoDuplicates(output.toString(), action3.getId() + " Sla END - MET!!!");
// negative on MISS after DB check, updated with actual times
- SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(job2.getId()));
+ SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, job2.getId());
assertEquals(job2.getStartTime(), slaSummary.getActualStart());
assertEquals(job2.getEndTime(), slaSummary.getActualEnd());
assertEquals(job2.getEndTime().getTime() - job2.getStartTime().getTime(), slaSummary.getActualDuration());
@@ -255,7 +257,7 @@ public class TestSLAService extends XDataTestCase {
assertNull(slas.getSLACalculator().get(job2.getId())); //removed from memory
// positives but also updated with actual times immediately after DB check
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(action2.getId()));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action2.getId());
extWf = jpaService.execute(new WorkflowJobGetJPAExecutor(action2.getExternalId()));
assertEquals(extWf.getStartTime(), slaSummary.getActualStart());
assertEquals(extWf.getEndTime(), slaSummary.getActualEnd());
@@ -266,7 +268,7 @@ public class TestSLAService extends XDataTestCase {
assertEquals(8, slaSummary.getEventProcessed());
assertNull(slas.getSLACalculator().get(action2.getId())); //removed from memory
- slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(action1.getId()));
+ slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId());
extWf = jpaService.execute(new WorkflowJobGetJPAExecutor(action1.getExternalId()));
assertEquals(extWf.getStartTime(), slaSummary.getActualStart());
assertEquals(extWf.getEndTime(), slaSummary.getActualEnd());
@@ -292,19 +294,18 @@ public class TestSLAService extends XDataTestCase {
Element eSla = XmlUtils.parseXml(slaXml);
SLAOperations.createSlaRegistrationEvent(eSla, "job-id1", "parent-id1", AppType.WORKFLOW_JOB, getTestUser(),
"test-appname", log, false);
- SLARegistrationBean reg = Services.get().get(JPAService.class)
- .execute(new SLARegistrationGetJPAExecutor("job-id1"));
+ SLARegistrationBean reg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, "job-id1");
assertEquals("END_MISS", reg.getAlertEvents());
}
- static SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
+ public static SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
SLARegistrationBean bean = new SLARegistrationBean();
bean.setId(jobId);
bean.setAppType(appType);
return bean;
}
- private void assertEventNoDuplicates(String outputStr, String eventMsg) {
+ public static void assertEventNoDuplicates(String outputStr, String eventMsg) {
int index = outputStr.indexOf(eventMsg);
assertTrue(index != -1);
//No duplicates
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java b/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
index 3a9e72e..bd543c2 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
@@ -22,16 +22,11 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-import org.apache.oozie.AppType;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.test.XDataTestCase;
public class TestSLASummaryGetOnRestartJPAExecutor extends XDataTestCase {
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index 3d37d48..946a4b9 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -18,7 +18,9 @@
package org.apache.oozie.test;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@@ -33,6 +35,7 @@ import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.FixedJsonInstanceSerializer;
import org.apache.oozie.util.ZKUtils;
+import org.apache.hadoop.conf.Configuration;
/**
* Provides a version of XTestCase that also runs a ZooKeeper server and provides some utilities for interacting and simulating ZK
@@ -63,6 +66,23 @@ public abstract class ZKXTestCase extends XDataTestCase {
protected void setUp() throws Exception {
super.setUp();
new Services().init();
+ setUpZK();
+ }
+
+ protected void setUp(Configuration conf) throws Exception {
+ super.setUp();
+ Services services = new Services();
+ if(conf != null && conf.size()>0){
+ for (Iterator<Entry<String, String>> itr = (Iterator<Entry<String, String>>) conf.iterator(); itr.hasNext();) {
+ Entry<String, String> entry = itr.next();
+ services.getConf().set(entry.getKey(), entry.getValue());
+ }
+ }
+ services.init();
+ setUpZK();
+ }
+
+ private void setUpZK() throws Exception {
zkServer = setupZKServer();
Services.get().getConf().set("oozie.zookeeper.connection.string", zkServer.getConnectString());
setSystemProperty("oozie.instance.id", ZK_ID);
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/resources/coord-action-sla.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/coord-action-sla.xml b/core/src/test/resources/coord-action-sla.xml
index 4893c61..ac93862 100644
--- a/core/src/test/resources/coord-action-sla.xml
+++ b/core/src/test/resources/coord-action-sla.xml
@@ -16,7 +16,7 @@
limitations under the License.
-->
<coordinator-app name="test-coord-sla" frequency="${coord:days(1)}"
- start="2009-01-01T08:01Z" end="2009-01-02T08:01Z"
+ start="2009-01-02T08:01Z" end="2009-01-03T08:00Z"
timezone="America/Los_Angeles"
xmlns="uri:oozie:coordinator:0.4"
xmlns:sla="uri:oozie:sla:0.2">
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index e8fa1a8..4ea53ec 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1678 HA support for SLA (ryota)
OOZIE-1685 Oozie doesn’t process correctly workflows with a non-default name node (benjzh via rohini)
OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang)
OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter)
[2/2] git commit: OOZIE-1678 HA support for SLA (ryota)
Posted by ry...@apache.org.
OOZIE-1678 HA support for SLA (ryota)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4e015d45
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4e015d45
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4e015d45
Branch: refs/heads/master
Commit: 4e015d45ee43008a4b82e9d9cf0b89b0ed894f88
Parents: d5f1e38
Author: egashira <ry...@yahoo.com>
Authored: Wed Jun 18 00:15:04 2014 -0700
Committer: egashira <ry...@yahoo.com>
Committed: Wed Jun 18 00:15:04 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/oozie/WorkflowJobBean.java | 2 +
.../command/coord/CoordChangeXCommand.java | 12 +-
.../executor/jpa/BundleActionQueryExecutor.java | 7 +
.../executor/jpa/BundleJobQueryExecutor.java | 6 +
.../executor/jpa/CoordActionQueryExecutor.java | 5 +
.../executor/jpa/CoordJobQueryExecutor.java | 5 +
.../oozie/executor/jpa/QueryExecutor.java | 3 +
.../jpa/SLARegistrationQueryExecutor.java | 55 ++-
.../executor/jpa/SLASummaryQueryExecutor.java | 57 ++-
.../jpa/WorkflowActionQueryExecutor.java | 6 +
.../executor/jpa/WorkflowJobQueryExecutor.java | 14 +
.../jpa/sla/SLARegistrationGetJPAExecutor.java | 66 ---
.../SLARegistrationGetOnRestartJPAExecutor.java | 74 ---
.../jpa/sla/SLASummaryGetJPAExecutor.java | 66 ---
.../org/apache/oozie/service/JPAService.java | 1 -
.../oozie/service/JobsConcurrencyService.java | 1 +
.../oozie/service/ZKJobsConcurrencyService.java | 1 +
.../org/apache/oozie/sla/SLACalcStatus.java | 64 +++
.../apache/oozie/sla/SLACalculatorMemory.java | 439 ++++++++++++++----
.../org/apache/oozie/sla/SLAOperations.java | 6 +-
.../org/apache/oozie/sla/SLASummaryBean.java | 9 +-
.../apache/oozie/sla/service/SLAService.java | 9 +-
.../command/coord/TestCoordChangeXCommand.java | 18 +-
.../jpa/TestSLARegistrationQueryExecutor.java | 51 ++-
.../jpa/TestSLASummaryQueryExecutor.java | 43 +-
.../jpa/TestWorkflowJobQueryExecutor.java | 9 +
...ForPurgeFromWorkflowParentIdJPAExecutor.java | 1 -
.../apache/oozie/service/TestHASLAService.java | 446 +++++++++++++++++++
.../sla/TestSLACalculationJPAExecutor.java | 15 +-
.../oozie/sla/TestSLACalculatorMemory.java | 100 +++--
.../oozie/sla/TestSLAEventGeneration.java | 13 +-
.../oozie/sla/TestSLAJobEventListener.java | 8 +-
.../sla/TestSLARegistrationGetJPAExecutor.java | 6 +-
...istrationGetRecordsOnRestartJPAExecutor.java | 72 ---
.../org/apache/oozie/sla/TestSLAService.java | 19 +-
.../TestSLASummaryGetOnRestartJPAExecutor.java | 5 -
.../java/org/apache/oozie/test/ZKXTestCase.java | 20 +
core/src/test/resources/coord-action-sla.xml | 2 +-
release-log.txt | 1 +
39 files changed, 1275 insertions(+), 462 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
index 0b8ee96..5fbee82 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
@@ -89,6 +89,8 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_WORKFLOW_STARTTIME", query = "select w.id, w.startTimestamp from WorkflowJobBean w where w.id = :id"),
+ @NamedQuery(name = "GET_WORKFLOW_START_END_TIME", query = "select w.id, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),
+
@NamedQuery(name = "GET_WORKFLOW_USER_GROUP", query = "select w.user, w.group from WorkflowJobBean w where w.id = :id"),
@NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index 436b999..805856c 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
import org.apache.commons.lang.StringUtils;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
@@ -40,10 +41,12 @@ import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.service.JPAService;
@@ -266,12 +269,13 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
if (SLAService.isEnabled()) {
Services.get().get(SLAService.class).removeRegistration(actionId);
}
- SLARegistrationBean slaReg = jpaService.execute(new SLARegistrationGetJPAExecutor(actionId));
+ SLARegistrationBean slaReg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, actionId);
if (slaReg != null) {
LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId());
deleteList.add(slaReg);
}
- SLASummaryBean slaSummaryBean = jpaService.execute(new SLASummaryGetJPAExecutor(actionId));
+ SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
+ SLASummaryQuery.GET_SLA_SUMMARY, actionId);
if (slaSummaryBean != null) {
LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
deleteList.add(slaSummaryBean);
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
index d2331e8..44f5d87 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleActionQueryExecutor.java
@@ -21,8 +21,10 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
+
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
@@ -206,4 +208,9 @@ public class BundleActionQueryExecutor extends
}
}
+ @Override
+ public Object getSingleValue(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ throw new UnsupportedOperationException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
index 319aea0..36cd968 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.executor.jpa;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
@@ -227,4 +228,9 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
instance = null;
}
}
+
+ @Override
+ public Object getSingleValue(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index d56af7b..3008393 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -258,4 +258,9 @@ public class CoordActionQueryExecutor extends
instance = null;
}
}
+
+ @Override
+ public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index cddeaf7..04e6e29 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -372,4 +372,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
}
}
+ @Override
+ public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ throw new UnsupportedOperationException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
index 536743b..e319c07 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
@@ -72,4 +72,7 @@ public abstract class QueryExecutor<T, E extends Enum<E>> {
public abstract Query getSelectQuery(E namedQuery, EntityManager em, Object... parameters)
throws JPAExecutorException;
+ public abstract Object getSingleValue(E namedQuery, Object... parameters)
+ throws JPAExecutorException;
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
index e3b115f..3a43d7e 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java
@@ -18,6 +18,7 @@
package org.apache.oozie.executor.jpa;
import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
@@ -35,7 +36,9 @@ import com.google.common.annotations.VisibleForTesting;
public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationBean, SLARegistrationQueryExecutor.SLARegQuery> {
public enum SLARegQuery {
- UPDATE_SLA_REG_ALL
+ UPDATE_SLA_REG_ALL,
+ GET_SLA_REG_ALL,
+ GET_SLA_REG_ON_RESTART
};
private static SLARegistrationQueryExecutor instance = new SLARegistrationQueryExecutor();
@@ -86,8 +89,17 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
@Override
public Query getSelectQuery(SLARegQuery namedQuery, EntityManager em, Object... parameters)
throws JPAExecutorException {
- // TODO
- return null;
+ Query query = em.createNamedQuery(namedQuery.name());
+ switch (namedQuery) {
+ case GET_SLA_REG_ALL:
+ case GET_SLA_REG_ON_RESTART:
+ query.setParameter("id", parameters[0]);
+ break;
+ default:
+ throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ + namedQuery.name());
+ }
+ return query;
}
@Override
@@ -102,8 +114,11 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
public SLARegistrationBean get(SLARegQuery namedQuery, Object... parameters) throws JPAExecutorException {
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
- @SuppressWarnings("unchecked")
- SLARegistrationBean bean = (SLARegistrationBean) jpaService.executeGet(namedQuery.name(), query, em);
+ Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+ if (ret == null && !namedQuery.equals(SLARegQuery.GET_SLA_REG_ALL)) {
+ throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+ }
+ SLARegistrationBean bean = constructBean(namedQuery, ret, parameters);
return bean;
}
@@ -117,6 +132,32 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
return beanList;
}
+ private SLARegistrationBean constructBean(SLARegQuery namedQuery, Object ret, Object... parameters)
+ throws JPAExecutorException {
+ SLARegistrationBean bean;
+ Object[] arr;
+ switch (namedQuery) {
+ case GET_SLA_REG_ALL:
+ bean = (SLARegistrationBean) ret;
+ if(bean != null) {
+ bean.setSlaConfig(bean.getSlaConfig());
+ }
+ break;
+ case GET_SLA_REG_ON_RESTART:
+ bean = new SLARegistrationBean();
+ arr = (Object[]) ret;
+ bean.setNotificationMsg((String) arr[0]);
+ bean.setUpstreamApps((String) arr[1]);
+ bean.setSlaConfig((String) arr[2]);
+ bean.setJobData((String) arr[3]);
+ break;
+ default:
+ throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ + namedQuery.name());
+ }
+ return bean;
+ }
+
@VisibleForTesting
public static void destroy() {
if (instance != null) {
@@ -125,4 +166,8 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB
}
}
+ @Override
+ public Object getSingleValue(SLARegQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index 79d11ed..f60e489 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -18,12 +18,15 @@
package org.apache.oozie.executor.jpa;
import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.sla.SLASummaryBean;
import com.google.common.annotations.VisibleForTesting;
@@ -36,8 +39,11 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
public enum SLASummaryQuery {
UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+ UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
UPDATE_SLA_SUMMARY_ALL,
- GET_SLA_SUMMARY
+ UPDATE_SLA_SUMMARY_EVENTPROCESSED,
+ GET_SLA_SUMMARY,
+ GET_SLA_SUMMARY_EVENTPROCESSED
};
private static SLASummaryQueryExecutor instance = new SLASummaryQueryExecutor();
@@ -73,6 +79,14 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
query.setParameter("actualEndTS", bean.getActualEndTimestamp());
query.setParameter("actualDuration", bean.getActualDuration());
break;
+ case UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES:
+ query.setParameter("jobId", bean.getId());
+ query.setParameter("eventProcessed", bean.getEventProcessed());
+ query.setParameter("actualStartTS", bean.getActualStartTimestamp());
+ query.setParameter("actualEndTS", bean.getActualEndTimestamp());
+ query.setParameter("actualDuration", bean.getActualDuration());
+ query.setParameter("lastModifiedTS", bean.getLastModifiedTimestamp());
+ break;
case UPDATE_SLA_SUMMARY_ALL:
query.setParameter("appName", bean.getAppName());
query.setParameter("appType", bean.getAppType().toString());
@@ -92,6 +106,10 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
query.setParameter("actualStartTS", bean.getActualStartTimestamp());
query.setParameter("jobId", bean.getId());
break;
+ case UPDATE_SLA_SUMMARY_EVENTPROCESSED:
+ query.setParameter("eventProcessed", bean.getEventProcessed());
+ query.setParameter("jobId", bean.getId());
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -105,6 +123,7 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
Query query = em.createNamedQuery(namedQuery.name());
switch (namedQuery) {
case GET_SLA_SUMMARY:
+ case GET_SLA_SUMMARY_EVENTPROCESSED:
query.setParameter("id", parameters[0]);
break;
}
@@ -123,8 +142,11 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
public SLASummaryBean get(SLASummaryQuery namedQuery, Object... parameters) throws JPAExecutorException {
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
- @SuppressWarnings("unchecked")
- SLASummaryBean bean = (SLASummaryBean) jpaService.executeGet(namedQuery.name(), query, em);
+ Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+ if (ret == null && !namedQuery.equals(SLASummaryQuery.GET_SLA_SUMMARY)) {
+ throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+ }
+ SLASummaryBean bean = constructBean(namedQuery, ret, parameters);
return bean;
}
@@ -137,6 +159,35 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
return beanList;
}
+ @Override
+ public Object getSingleValue(SLASummaryQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ EntityManager em = jpaService.getEntityManager();
+ Query query = getSelectQuery(namedQuery, em, parameters);
+ Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+ if (ret == null) {
+ throw new JPAExecutorException(ErrorCode.E0604, query.toString());
+ }
+ return ret;
+ }
+
+ private SLASummaryBean constructBean(SLASummaryQuery namedQuery, Object ret, Object... parameters)
+ throws JPAExecutorException {
+ SLASummaryBean bean;
+ switch (namedQuery) {
+ case GET_SLA_SUMMARY:
+ bean = (SLASummaryBean) ret;
+ break;
+ case GET_SLA_SUMMARY_EVENTPROCESSED:
+ bean = new SLASummaryBean();
+ bean.setEventProcessed(((Byte)ret).intValue());
+ break;
+ default:
+ throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ + namedQuery.name());
+ }
+ return bean;
+ }
+
@VisibleForTesting
public static void destroy() {
if (instance != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 0c323a3..86f2c1b 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.executor.jpa;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
@@ -407,4 +408,9 @@ public class WorkflowActionQueryExecutor extends
instance = null;
}
}
+
+ @Override
+ public Object getSingleValue(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index e7d42e9..e2a9438 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -50,6 +50,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
UPDATE_WORKFLOW_RERUN,
GET_WORKFLOW,
GET_WORKFLOW_STARTTIME,
+ GET_WORKFLOW_START_END_TIME,
GET_WORKFLOW_USER_GROUP,
GET_WORKFLOW_SUSPEND,
GET_WORKFLOW_ACTION_OP,
@@ -170,6 +171,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
switch (namedQuery) {
case GET_WORKFLOW:
case GET_WORKFLOW_STARTTIME:
+ case GET_WORKFLOW_START_END_TIME:
case GET_WORKFLOW_USER_GROUP:
case GET_WORKFLOW_SUSPEND:
case GET_WORKFLOW_ACTION_OP:
@@ -212,6 +214,13 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
bean.setId((String) arr[0]);
bean.setStartTime(DateUtils.toDate((Timestamp) arr[1]));
break;
+ case GET_WORKFLOW_START_END_TIME:
+ bean = new WorkflowJobBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setStartTime(DateUtils.toDate((Timestamp) arr[1]));
+ bean.setEndTime(DateUtils.toDate((Timestamp) arr[2]));
+ break;
case GET_WORKFLOW_USER_GROUP:
bean = new WorkflowJobBean();
arr = (Object[]) ret;
@@ -353,4 +362,9 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
instance = null;
}
}
+
+ @Override
+ public Object getSingleValue(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
deleted file mode 100644
index 7b4e177..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetJPAExecutor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa.sla;
-
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.executor.jpa.JPAExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.sla.SLARegistrationBean;
-
-/**
- * Load the list of SLARegistrationBean and return the list.
- */
-public class SLARegistrationGetJPAExecutor implements JPAExecutor<SLARegistrationBean> {
-
- private String id = null;
-
- public SLARegistrationGetJPAExecutor(String id) {
- this.id = id;
- }
-
- @Override
- public String getName() {
- return "SLARegistrationGetJPAExecutor";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public SLARegistrationBean execute(EntityManager em) throws JPAExecutorException {
- List<SLARegistrationBean> regBeans;
- try {
- Query q = em.createNamedQuery("GET_SLA_REG_ALL");
- q.setParameter("id", id);
- regBeans = q.getResultList();
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- SLARegistrationBean slaRegBean = null;
- if (regBeans != null && regBeans.size() > 0) {
- slaRegBean = regBeans.get(0);
- slaRegBean.setSlaConfig(slaRegBean.getSlaConfig());
- }
- return slaRegBean;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
deleted file mode 100644
index 1510daf..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLARegistrationGetOnRestartJPAExecutor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa.sla;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.executor.jpa.JPAExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.sla.SLARegistrationBean;
-
-/**
- * Load SLARegistrationBean on restart
- */
-public class SLARegistrationGetOnRestartJPAExecutor implements JPAExecutor<SLARegistrationBean> {
-
- private String id;
-
- public SLARegistrationGetOnRestartJPAExecutor(String id) {
- this.id = id;
- }
-
- @Override
- public String getName() {
- return "SLARegistrationGetOnRestartJPAExecutor";
- }
-
- @Override
- public SLARegistrationBean execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_SLA_REG_ON_RESTART");
- q.setParameter("id", id);
- Object[] obj = (Object[]) q.getSingleResult();
- return getBeanFromObj(obj);
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
- private SLARegistrationBean getBeanFromObj(Object[] arr) {
- SLARegistrationBean bean = new SLARegistrationBean();
- if (arr[0] != null) {
- bean.setNotificationMsg((String) arr[0]);
- }
- if (arr[1] != null) {
- bean.setUpstreamApps((String) arr[1]);
- }
- if (arr[2] != null) {
- bean.setSlaConfig((String) arr[2]);
- }
- if (arr[3] != null) {
- bean.setJobData((String) arr[3]);
- }
- return bean;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java
deleted file mode 100644
index 1f7cb4d..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryGetJPAExecutor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa.sla;
-
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.executor.jpa.JPAExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.sla.SLASummaryBean;
-
-/**
- * Load the list of SLASummaryBean (for dashboard) and return the list.
- */
-public class SLASummaryGetJPAExecutor implements JPAExecutor<SLASummaryBean> {
-
- private String id = null;
-
- public SLASummaryGetJPAExecutor(String id) {
- this.id = id;
- }
-
- @Override
- public String getName() {
- return "SLASummaryGetJPAExecutor";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public SLASummaryBean execute(EntityManager em) throws JPAExecutorException {
- List<SLASummaryBean> ssBeans;
- Query q;
- try {
- q = em.createNamedQuery("GET_SLA_SUMMARY");
- q.setParameter("id", id);
- ssBeans = q.getResultList();
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- SLASummaryBean slaSummaryBean = null;
- if (ssBeans != null && ssBeans.size() > 0) {
- slaSummaryBean = ssBeans.get(0);
- }
- return slaSummaryBean;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/service/JPAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JPAService.java b/core/src/main/java/org/apache/oozie/service/JPAService.java
index 1b8f53e..857ec29 100644
--- a/core/src/main/java/org/apache/oozie/service/JPAService.java
+++ b/core/src/main/java/org/apache/oozie/service/JPAService.java
@@ -71,7 +71,6 @@ public class JPAService implements Service, Instrumentable {
public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties";
-
public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 36adbd6..69025fc 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -20,6 +20,7 @@ package org.apache.oozie.service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.oozie.util.ConfigUtils;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 1d5f4a4..58580e5 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.rest.RestConstants;
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index ea53712..f148db3 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -19,8 +19,19 @@
package org.apache.oozie.sla;
import java.util.Date;
+
import org.apache.oozie.AppType;
+import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.lock.LockToken;
+import org.apache.oozie.service.InstrumentationService;
+import org.apache.oozie.service.JobsConcurrencyService;
+import org.apache.oozie.service.MemoryLocksService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.service.SLAService;
+import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.util.XLog;
/**
* Class used by SLAService to store SLA objects and perform calculations and
@@ -28,6 +39,7 @@ import org.apache.oozie.client.event.SLAEvent;
*/
public class SLACalcStatus extends SLAEvent {
+ public static String SLA_ENTITYKEY_PREFIX = "sla-";
private SLARegistrationBean regBean;
private String jobStatus;
private SLAStatus slaStatus;
@@ -37,6 +49,7 @@ public class SLACalcStatus extends SLAEvent {
private long actualDuration = -1;
private Date lastModifiedTime;
private byte eventProcessed;
+ private LockToken lock;
public SLACalcStatus(SLARegistrationBean reg) {
this();
@@ -264,4 +277,55 @@ public class SLACalcStatus extends SLAEvent {
return lastModifiedTime;
}
+ public String getEntityKey() {
+ return SLA_ENTITYKEY_PREFIX + this.getId();
+ }
+ /**
+ * Obtain an exclusive lock on the {link #getEntityKey}.
+ * <p/>
+ * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
+ *
+ * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
+ */
+ public void acquireLock() throws InterruptedException {
+ // only get ZK lock when multiple servers running
+ if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
+ lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
+ if (lock == null) {
+ XLog.getLog(getClass()).debug("Could not aquire lock for [{0}]", getEntityKey());
+ }
+ else {
+ XLog.getLog(getClass()).debug("Acquired lock for [{0}]", getEntityKey());
+ }
+ }
+ else {
+ lock = new DummyToken();
+ }
+ }
+
+ private static class DummyToken implements LockToken {
+ @Override
+ public void release() {
+ }
+ }
+
+ public boolean isLocked() {
+ boolean locked = false;
+ if(lock != null) {
+ locked = true;
+ }
+ return locked;
+ }
+
+ public void releaseLock(){
+ if (lock != null) {
+ lock.release();
+ lock = null;
+ XLog.getLog(getClass()).debug("Released lock for [{0}]", getEntityKey());
+ }
+ }
+
+ public long getLockTimeOut() {
+ return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 618d899..47c723d 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -31,9 +31,11 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
@@ -43,24 +45,37 @@ import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
+import org.apache.oozie.lock.LockToken;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.JobsConcurrencyService;
+import org.apache.oozie.service.MemoryLocksService;
+import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLog;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Implementation class for SLACalculator that calculates SLA related to
@@ -70,11 +85,11 @@ public class SLACalculatorMemory implements SLACalculator {
private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class);
// TODO optimization priority based insertion/processing/bumping up-down
- private static Map<String, SLACalcStatus> slaMap;
- private static Set<String> historySet;
+ protected Map<String, SLACalcStatus> slaMap;
+ protected static Set<String> historySet;
private static int capacity;
private static JPAService jpaService;
- private EventHandlerService eventHandler;
+ protected EventHandlerService eventHandler;
private static int modifiedAfter;
private static long jobEventLatency;
@@ -89,7 +104,154 @@ public class SLACalculatorMemory implements SLACalculator {
// load events modified after
modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
loadOnRestart();
+ Runnable purgeThread = new HistoryPurgeWorker();
+ // schedule runnable by default 1 day
+ Services.get()
+ .get(SchedulerService.class)
+ .schedule(purgeThread, 86400, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 86400),
+ SchedulerService.Unit.SEC);
+ }
+
+ public class HistoryPurgeWorker implements Runnable {
+
+ public HistoryPurgeWorker() {
+ }
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ Iterator<String> jobItr = historySet.iterator();
+ while (jobItr.hasNext()) {
+ String jobId = jobItr.next();
+
+ if (jobId.endsWith("-W")) {
+ WorkflowJobBean wfJob = null;
+ try {
+ wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
+ }
+ catch (JPAExecutorException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0604)) {
+ jobItr.remove();
+ }
+ else {
+ LOG.info("Failed to fetch the workflow job: " + jobId, e);
+ }
+ }
+ if (wfJob != null && wfJob.inTerminalState()) {
+ try {
+ updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime());
+ jobItr.remove();
+ }
+ catch (JPAExecutorException e) {
+ LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+ }
+ }
+ }
+ else if (jobId.contains("-W@")) {
+ WorkflowActionBean wfAction = null;
+ try {
+ wfAction = WorkflowActionQueryExecutor.getInstance().get(
+ WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
+ }
+ catch (JPAExecutorException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0605)) {
+ jobItr.remove();
+ }
+ else {
+ LOG.info("Failed to fetch the workflow action: " + jobId, e);
+ }
+ }
+ if (wfAction != null && (wfAction.isComplete() || wfAction.isTerminalWithFailure())) {
+ try {
+ updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime());
+ jobItr.remove();
+ }
+ catch (JPAExecutorException e) {
+ LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+ }
+ }
+ }
+ else if (jobId.contains("-C@")) {
+ CoordinatorActionBean cAction = null;
+ try {
+ cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId);
+ }
+ catch (JPAExecutorException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0605)) {
+ jobItr.remove();
+ }
+ else {
+ LOG.info("Failed to fetch the coord action: " + jobId, e);
+ }
+ }
+ if (cAction != null && cAction.isTerminalStatus()) {
+ try {
+ updateSLASummaryForCoordAction(cAction);
+ jobItr.remove();
+ }
+ catch (JPAExecutorException e) {
+ XLog.getLog(SLACalculatorMemory.class).info(
+ "Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+ }
+
+ }
+ }
+ else if (jobId.endsWith("-C")) {
+ CoordinatorJobBean cJob = null;
+ try {
+ cJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID,
+ jobId);
+ }
+ catch (JPAExecutorException e) {
+ if (e.getErrorCode().equals(ErrorCode.E0604)) {
+ jobItr.remove();
+ }
+ else {
+ LOG.info("Failed to fetch the coord job: " + jobId, e);
+ }
+ }
+ if (cJob != null && cJob.isTerminalStatus()) {
+ try {
+ updateSLASummary(cJob.getId(), cJob.getStartTime(), cJob.getEndTime());
+ jobItr.remove();
+ }
+ catch (JPAExecutorException e) {
+ LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
+ }
+
+ }
+ }
+ }
+ }
+
+ private void updateSLASummary(String id, Date startTime, Date endTime) throws JPAExecutorException {
+ SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
+ if (sla != null) {
+ sla.setActualStart(startTime);
+ sla.setActualEnd(endTime);
+ if (startTime != null && endTime != null) {
+ sla.setActualDuration(endTime.getTime() - startTime.getTime());
+ }
+ sla.setLastModifiedTime(new Date());
+ sla.setEventProcessed(8);
+ SLASummaryQueryExecutor.getInstance().executeUpdate(
+ SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, sla);
+ }
+ }
+
+ private void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
+ String wrkflowId = bean.getExternalId();
+ if (wrkflowId != null) {
+ WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
+ WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
+ if (wrkflow != null) {
+ updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime());
+ }
+ }
+ }
}
private void loadOnRestart() {
@@ -101,28 +263,50 @@ public class SLACalculatorMemory implements SLACalculator {
modifiedAfter));
for (SLASummaryBean summaryBean : summaryBeans) {
String jobId = summaryBean.getId();
- try {
- switch (summaryBean.getAppType()) {
- case COORDINATOR_ACTION:
- isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
- break;
- case WORKFLOW_ACTION:
- isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
- break;
- case WORKFLOW_JOB:
- isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
- break;
- default:
- break;
- }
- if (isJobModified) {
- summaryBean.setLastModifiedTime(new Date());
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
- }
+ LockToken lock = null;
+ switch (summaryBean.getAppType()) {
+ case COORDINATOR_ACTION:
+ isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
+ break;
+ case WORKFLOW_ACTION:
+ isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
+ break;
+ case WORKFLOW_JOB:
+ isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
+ break;
+ default:
+ break;
}
- catch (Exception e) {
- LOG.warn("Failed to load records for " + jobId, e);
+ if (isJobModified) {
+ try {
+ boolean update = true;
+ if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
+ lock = Services
+ .get()
+ .get(MemoryLocksService.class)
+ .getWriteLock(
+ SLACalcStatus.SLA_ENTITYKEY_PREFIX + jobId,
+ Services.get().getConf()
+ .getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000));
+ if (lock == null) {
+ update = false;
+ }
+ }
+ if (update) {
+ summaryBean.setLastModifiedTime(new Date());
+ SLASummaryQueryExecutor.getInstance().executeUpdate(
+ SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Failed to load records for " + jobId, e);
+ }
+ finally {
+ if (lock != null) {
+ lock.release();
+ lock = null;
+ }
+ }
}
try {
if (summaryBean.getEventProcessed() == 7) {
@@ -130,8 +314,8 @@ public class SLACalculatorMemory implements SLACalculator {
statusPendingCount++;
}
else if (summaryBean.getEventProcessed() <= 7) {
- SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
- jobId));
+ SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
slaMap.put(jobId, slaCalcStatus);
slaPendingCount++;
@@ -260,8 +444,8 @@ public class SLACalculatorMemory implements SLACalculator {
SLACalcStatus memObj;
memObj = slaMap.get(jobId);
if (memObj == null && historySet.contains(jobId)) {
- memObj = new SLACalcStatus(jpaService.execute(new SLASummaryGetJPAExecutor(jobId)),
- jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(jobId)));
+ memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
+ SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
}
return memObj;
}
@@ -285,11 +469,15 @@ public class SLACalculatorMemory implements SLACalculator {
/**
* Invoked via periodic run, update the SLA for registered jobs
*/
- protected void updateJobSla(String jobId) throws JPAExecutorException, ServiceException {
+ protected void updateJobSla(String jobId) throws Exception {
SLACalcStatus slaCalc = slaMap.get(jobId);
synchronized (slaCalc) {
boolean change = false;
- byte eventProc = slaCalc.getEventProcessed();
+ // get eventProcessed on DB for validation in HA
+ Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
+ SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
+ byte eventProc = ((Byte) eventProcObj).byteValue();
+ slaCalc.setEventProcessed(eventProc);
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
// calculation w.r.t current time and status
if ((eventProc & 1) == 0) { // first bit (start-processed) unset
@@ -297,8 +485,8 @@ public class SLACalculatorMemory implements SLACalculator {
if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
confirmWithDB(slaCalc);
eventProc = slaCalc.getEventProcessed();
- if (eventProc != 8 && (eventProc & 1 ) == 0) {
- //Some DB exception
+ if (eventProc != 8 && (eventProc & 1) == 0) {
+ // Some DB exception
slaCalc.setEventStatus(EventStatus.START_MISS);
eventHandler.queueEvent(new SLACalcStatus(slaCalc));
eventProc++;
@@ -307,11 +495,13 @@ public class SLACalculatorMemory implements SLACalculator {
}
}
else {
- eventProc++; //disable further processing for optional start sla condition
+ eventProc++; // disable further processing for optional
+ // start sla condition
change = true;
}
}
- if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { // check if second bit (duration-processed) is unset
+ // check if second bit (duration-processed) is unset
+ if (((eventProc >> 1) & 1) == 0 && eventProc != 8) {
if (reg.getExpectedDuration() == -1) {
eventProc += 2;
change = true;
@@ -322,8 +512,8 @@ public class SLACalculatorMemory implements SLACalculator {
slaCalc.setEventProcessed(eventProc);
confirmWithDB(slaCalc);
eventProc = slaCalc.getEventProcessed();
- if (eventProc != 8 && ((eventProc >> 1) & 1 ) == 0) {
- //Some DB exception
+ if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+ // Some DB exception
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
eventHandler.queueEvent(new SLACalcStatus(slaCalc));
eventProc += 2;
@@ -341,32 +531,51 @@ public class SLACalculatorMemory implements SLACalculator {
}
}
if (change) {
- if (slaCalc.getEventProcessed() >= 8) { //no more processing, no transfer to history set
- eventProc = 8;
- slaCalc.setEventProcessed(8); // Should not be > 8. But to handle any corner cases.
- slaMap.remove(jobId);
+ try {
+ boolean locked = true;
+ slaCalc.acquireLock();
+ locked = slaCalc.isLocked();
+ if (locked) {
+ // no more processing, no transfer to history set
+ if (slaCalc.getEventProcessed() >= 8) {
+ eventProc = 8;
+ // Should not be > 8. But to handle any corner cases
+ slaCalc.setEventProcessed(8);
+ slaMap.remove(jobId);
+ }
+ else {
+ slaCalc.setEventProcessed(eventProc);
+ }
+ SLASummaryBean slaSummaryBean = new SLASummaryBean();
+ slaSummaryBean.setId(slaCalc.getId());
+ slaSummaryBean.setEventProcessed(eventProc);
+ slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+ slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+ slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+ slaSummaryBean.setActualStart(slaCalc.getActualStart());
+ slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+ slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+ slaSummaryBean.setLastModifiedTime(new Date());
+ SLASummaryQueryExecutor.getInstance().executeUpdate(
+ SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
+ if (eventProc == 7) {
+ historySet.add(jobId);
+ slaMap.remove(jobId);
+ LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+ }
+ }
}
- else {
- slaCalc.setEventProcessed(eventProc);
+ catch (InterruptedException e) {
+ throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
}
- SLASummaryBean slaSummaryBean = new SLASummaryBean();
- slaSummaryBean.setId(slaCalc.getId());
- slaSummaryBean.setEventProcessed(eventProc);
- slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
- slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
- slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
- slaSummaryBean.setActualStart(slaCalc.getActualStart());
- slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
- slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
- slaSummaryBean.setLastModifiedTime(new Date());
- SLASummaryQueryExecutor.getInstance().executeUpdate(
- SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
- if (eventProc == 7) {
- historySet.add(jobId);
- slaMap.remove(jobId);
- LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
+ finally {
+ slaCalc.releaseLock();
}
-
+ }
+ else if (eventProc >= 7) {
+ historySet.add(jobId);
+ slaMap.remove(jobId);
+ LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
}
}
}
@@ -494,46 +703,77 @@ public class SLACalculatorMemory implements SLACalculator {
SLACalcStatus slaCalc = slaMap.get(jobId);
SLASummaryBean slaInfo = null;
boolean hasSla = false;
- if (slaCalc != null) {
- synchronized (slaCalc) {
- slaCalc.setJobStatus(jobStatus);
- switch (jobEventStatus) {
- case STARTED:
- slaInfo = processJobStartSLA(slaCalc, startTime);
- break;
- case SUCCESS:
- slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
- break;
- case FAILURE:
- slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
- break;
- default:
- LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
- slaInfo = getSLASummaryBean(slaCalc);
+ if (slaCalc == null) {
+ if (historySet.contains(jobId)) {
+ slaInfo = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+ if (slaInfo == null) {
+ throw new JPAExecutorException(ErrorCode.E0604, jobId);
}
-
- if (slaCalc.getEventProcessed() == 7) {
- slaInfo.setEventProcessed(8);
- slaMap.remove(jobId);
+ slaInfo.setJobStatus(jobStatus);
+ slaInfo.setActualStart(startTime);
+ slaInfo.setActualEnd(endTime);
+ if (endTime != null) {
+ slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
}
+ slaInfo.setEventProcessed(8);
+ historySet.remove(jobId);
hasSla = true;
+ } else {
+ // jobid might not exist in slaMap in HA Setting
+ SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ALL, jobId);
+ SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY,
+ jobId);
+ slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
+ if(slaCalc.getEventProcessed() < 7){
+ slaMap.put(jobId, slaCalc);
+ }
}
- LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
}
- else if (historySet.contains(jobId)) {
- slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
- if (slaInfo == null) {
- throw new JPAExecutorException(ErrorCode.E0604, jobId);
- }
- slaInfo.setJobStatus(jobStatus);
- slaInfo.setActualStart(startTime);
- slaInfo.setActualEnd(endTime);
- if (endTime != null) {
- slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
+ if (slaCalc != null) {
+ synchronized (slaCalc) {
+ try {
+ // only get ZK lock when multiple servers running
+ boolean locked = true;
+ slaCalc.acquireLock();
+ locked = slaCalc.isLocked();
+ if (locked) {
+ // get eventProcessed on DB for validation in HA
+ Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
+ .getSingleValue(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
+ byte eventProc = ((Byte) eventProcObj).byteValue();
+ slaCalc.setEventProcessed(eventProc);
+ slaCalc.setJobStatus(jobStatus);
+ switch (jobEventStatus) {
+ case STARTED:
+ slaInfo = processJobStartSLA(slaCalc, startTime);
+ break;
+ case SUCCESS:
+ slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
+ break;
+ case FAILURE:
+ slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
+ break;
+ default:
+ LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
+ slaInfo = getSLASummaryBean(slaCalc);
+ }
+
+ if (slaCalc.getEventProcessed() == 7) {
+ slaInfo.setEventProcessed(8);
+ slaMap.remove(jobId);
+ }
+ hasSla = true;
+ }
+ }
+ catch (InterruptedException e) {
+ throw new ServiceException(ErrorCode.E0606, slaCalc.getEntityKey(), slaCalc.getLockTimeOut());
+ }
+ finally {
+ slaCalc.releaseLock();
+ }
}
- slaInfo.setEventProcessed(8);
- historySet.remove(jobId);
- hasSla = true;
+ LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
}
if (hasSla) {
@@ -829,4 +1069,13 @@ public class SLACalculatorMemory implements SLACalculator {
}
}
+ @VisibleForTesting
+ public boolean isJobIdInSLAMap(String jobId) {
+ return this.slaMap.containsKey(jobId);
+ }
+
+ @VisibleForTesting
+ public boolean isJobIdInHistorySet(String jobId) {
+ return this.historySet.contains(jobId);
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
index b0da3dc..0cad071 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
@@ -19,12 +19,14 @@ package org.apache.oozie.sla;
import java.text.ParseException;
import java.util.Date;
+
import org.apache.oozie.AppType;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
@@ -165,7 +167,7 @@ public class SLAOperations {
JPAService jpaService = Services.get().get(JPAService.class);
SLAService slaService = Services.get().get(SLAService.class);
try {
- SLARegistrationBean reg = jpaService.execute(new SLARegistrationGetJPAExecutor(jobId));
+ SLARegistrationBean reg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
if (reg != null) { //handle coord rerun with different config without sla
slaService.updateRegistrationEvent(reg);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
index 0a70326..80883ee 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
@@ -47,11 +47,18 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"),
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"),
+
+ @NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = :jobId"),
+
@NamedQuery(name = "UPDATE_SLA_SUMMARY_ALL", query = "update SLASummaryBean w set w.jobId = :jobId, w.appName = :appName, w.appType = :appType, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.jobStatus = :jobStatus, w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.lastModifiedTS = :lastModTime, w.user = :user, w.parentId = :parentId, w.eventProcessed = :eventProcessed, w.actualDuration = :actualDuration, w.actualEndTS = :actualEndTS, w.actualStartTS = :actualStartTS where w.jobId = :jobId"),
@NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"),
- @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime") })
+ @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select w.eventProcessed from SLASummaryBean w where w.jobId = :id")
+})
/**
* Class to store all the SLA related details (summary) per job
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
index 7fcb334..2349329 100644
--- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
+++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
@@ -18,6 +18,7 @@
package org.apache.oozie.sla.service;
import java.util.Date;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.JobEvent.EventStatus;
@@ -31,6 +32,7 @@ import org.apache.oozie.sla.SLACalculator;
import org.apache.oozie.sla.SLACalculatorMemory;
import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.util.XLog;
+
import com.google.common.annotations.VisibleForTesting;
public class SLAService implements Service {
@@ -43,6 +45,9 @@ public class SLAService implements Service {
public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
//Time interval, in seconds, at which SLA Worker will be scheduled to run
public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
+ public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay";
+ public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout";
+ public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval";
private static SLACalculator calcImpl;
private static boolean slaEnabled = false;
@@ -69,7 +74,9 @@ public class SLAService implements Service {
Runnable slaThread = new SLAWorker(calcImpl);
// schedule runnable by default every 30 sec
int slaCheckInterval = services.getConf().getInt(CONF_SLA_CHECK_INTERVAL, 30);
- services.get(SchedulerService.class).schedule(slaThread, 10, slaCheckInterval, SchedulerService.Unit.SEC);
+ int slaCheckInitialDelay = services.getConf().getInt(CONF_SLA_CHECK_INITIAL_DELAY, 10);
+ services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval,
+ SchedulerService.Unit.SEC);
slaEnabled = true;
LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
conf.get(SLAService.CONF_CAPACITY));
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index 86c2d32..4e842cf 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -36,10 +36,12 @@ import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.StatusTransitService;
@@ -611,17 +613,17 @@ public class TestCoordChangeXCommand extends XDataTestCase {
assertEquals(ErrorCode.E0603, jpae.getErrorCode());
}
- slaRegBean1 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean1.getId()));
+ slaRegBean1 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean1.getId());
assertNotNull(slaRegBean1);
- slaRegBean2 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean2.getId()));
+ slaRegBean2 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean2.getId());
assertNotNull(slaRegBean2);
- slaRegBean3 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean3.getId()));
+ slaRegBean3 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean3.getId());
assertNull(slaRegBean3);
- slaRegBean4 = jpaService.execute(new SLARegistrationGetJPAExecutor(slaRegBean4.getId()));
+ slaRegBean4 = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, slaRegBean4.getId());
assertNull(slaRegBean4);
- slaSummaryBean3 = jpaService.execute(new SLASummaryGetJPAExecutor(slaSummaryBean3.getId()));
+ slaSummaryBean3 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean3.getId());
assertNull(slaSummaryBean3);
- slaSummaryBean1 = jpaService.execute(new SLASummaryGetJPAExecutor(slaSummaryBean1.getId()));
+ slaSummaryBean1 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean1.getId());
assertNotNull(slaSummaryBean1);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/4e015d45/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
index 00fb677..b820437 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLARegistrationQueryExecutor.java
@@ -44,7 +44,7 @@ public class TestSLARegistrationQueryExecutor extends XDataTestCase {
super.tearDown();
}
- public void testGetQuery() throws Exception {
+ public void testGetUpdateQuery() throws Exception {
EntityManager em = jpaService.getEntityManager();
SLARegistrationBean bean = addRecordToSLARegistrationTable("test-application", SLAStatus.MET);
@@ -69,11 +69,56 @@ public class TestSLARegistrationQueryExecutor extends XDataTestCase {
em.close();
}
- public void testExecuteUpdate() throws Exception {
- // TODO
+ public void testGetSelectQuery() throws Exception {
+ EntityManager em = jpaService.getEntityManager();
+ SLARegistrationBean bean = addRecordToSLARegistrationTable("test-application", SLAStatus.MET);
+ // GET_SLA_REG_ALL
+ Query query = SLARegistrationQueryExecutor.getInstance().getSelectQuery(SLARegQuery.GET_SLA_REG_ALL, em,
+ bean.getId());
+ assertEquals(query.getParameterValue("id"), bean.getId());
+
+ // GET_WORKFLOW_SUSPEND
+ query = SLARegistrationQueryExecutor.getInstance().getSelectQuery(SLARegQuery.GET_SLA_REG_ON_RESTART, em,
+ bean.getId());
+ assertEquals(query.getParameterValue("id"), bean.getId());
}
public void testGet() throws Exception {
+
+ SLARegistrationBean bean = addRecordToSLARegistrationTable("test-application", SLAStatus.MET);
+ //GET_SLA_REG_ON_RESTART
+ SLARegistrationBean retBean = SLARegistrationQueryExecutor.getInstance().get(
+ SLARegQuery.GET_SLA_REG_ON_RESTART, bean.getId());
+ assertEquals(bean.getJobData(), retBean.getJobData());
+ assertEquals(bean.getSlaConfig(), retBean.getSlaConfig());
+ assertEquals(bean.getUpstreamApps(), retBean.getUpstreamApps());
+ assertEquals(bean.getNotificationMsg(), retBean.getNotificationMsg());
+ assertNull(retBean.getAppName());
+ assertNull(retBean.getExpectedEnd());
+ assertNull(retBean.getExpectedStart());
+ assertNull(retBean.getCreatedTime());
+ assertNull(retBean.getNominalTime());
+ assertNull(retBean.getUser());
+ assertNull(retBean.getParentId());
+ //GET_SLA_REG_ALL
+ retBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, bean.getId());
+ assertEquals(bean.getId(), retBean.getId());
+ assertEquals(bean.getAppName(), retBean.getAppName());
+ assertEquals(bean.getAppType(), retBean.getAppType());
+ assertEquals(bean.getExpectedDuration(), retBean.getExpectedDuration());
+ assertEquals(bean.getExpectedStart().getTime(), retBean.getExpectedStart().getTime());
+ assertEquals(bean.getExpectedEnd().getTime(), retBean.getExpectedEnd().getTime());
+ assertEquals(bean.getCreatedTime().getTime(), retBean.getCreatedTime().getTime());
+ assertEquals(bean.getNominalTime().getTime(), retBean.getNominalTime().getTime());
+ assertEquals(bean.getNotificationMsg(), retBean.getNotificationMsg());
+ assertEquals(bean.getJobData(), retBean.getJobData());
+ assertEquals(bean.getParentId(), retBean.getParentId());
+ assertEquals(bean.getSlaConfig(), retBean.getSlaConfig());
+ assertEquals(bean.getUpstreamApps(), retBean.getUpstreamApps());
+ assertEquals(bean.getUser(), retBean.getUser());
+}
+
+ public void testExecuteUpdate() throws Exception {
// TODO
}