You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/05/17 22:08:47 UTC
[3/3] oozie git commit: OOZIE-2509 SLA job status can stuck in
running state
OOZIE-2509 SLA job status can stuck in running state
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ba7a7b85
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ba7a7b85
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ba7a7b85
Branch: refs/heads/master
Commit: ba7a7b85e040a313fa107474768dd67a325f91d5
Parents: 5fbd3eb
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue May 17 15:08:35 2016 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue May 17 15:08:35 2016 -0700
----------------------------------------------------------------------
.../command/coord/CoordActionCheckXCommand.java | 6 +-
.../sla/SLACoordActionJobEventXCommand.java | 80 ++
.../sla/SLACoordActionJobHistoryXCommand.java | 78 ++
.../oozie/command/sla/SLAJobEventXCommand.java | 301 ++++++
.../command/sla/SLAJobHistoryXCommand.java | 127 +++
.../sla/SLAWorkflowActionJobEventXCommand.java | 62 ++
.../SLAWorkflowActionJobHistoryXCommand.java | 57 ++
.../sla/SLAWorkflowJobEventXCommand.java | 64 ++
.../sla/SLAWorkflowJobHistoryXCommand.java | 56 ++
.../jpa/CoordActionGetForSLAJPAExecutor.java | 82 --
.../executor/jpa/CoordActionQueryExecutor.java | 13 +-
.../executor/jpa/SLASummaryQueryExecutor.java | 9 -
.../jpa/WorkflowActionGetForSLAJPAExecutor.java | 78 --
.../jpa/WorkflowActionQueryExecutor.java | 13 +-
.../jpa/WorkflowJobGetForSLAJPAExecutor.java | 78 --
.../executor/jpa/WorkflowJobQueryExecutor.java | 13 +-
.../oozie/service/ConfigurationService.java | 12 +-
.../org/apache/oozie/sla/SLACalcStatus.java | 54 -
.../apache/oozie/sla/SLACalculatorMemory.java | 989 +++----------------
.../org/apache/oozie/sla/SLASummaryBean.java | 2 -
.../apache/oozie/sla/SLAXCommandFactory.java | 92 ++
.../coord/TestCoordActionsKillXCommand.java | 11 +-
.../jpa/TestSLASummaryQueryExecutor.java | 2 +-
.../apache/oozie/service/TestHASLAService.java | 64 +-
.../oozie/sla/TestSLACalculatorMemory.java | 344 +++++--
.../oozie/sla/TestSLAEventGeneration.java | 1 +
.../oozie/sla/TestSLAJobEventListener.java | 136 ++-
.../org/apache/oozie/sla/TestSLAService.java | 123 ++-
release-log.txt | 1 +
29 files changed, 1668 insertions(+), 1280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
index 128feb2..bdbbd24 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
@@ -47,9 +47,10 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
/**
* The command checks workflow status for coordinator action.
@@ -177,7 +178,8 @@ public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> {
coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
coordAction.getJobId()));
- workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
+ workflowJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+ coordAction.getExternalId());
LogUtils.setLogInfo(coordAction);
}
else {
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
new file mode 100644
index 0000000..dfe2637
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobEventXCommand.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobEventXCommand extends SLAJobEventXCommand {
+ CoordinatorActionBean ca;
+ WorkflowJobBean wf;
+
+ public SLACoordActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super(slaCalc, lockTimeOut);
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ ca = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, slaCalc.getId());
+ if (ca.getExternalId() != null) {
+ wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, ca.getExternalId());
+ }
+ LogUtils.setLogInfo(ca);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ }
+
+
+ protected void updateJobInfo() {
+ if (ca.isTerminalStatus()) {
+ setEnded(true);
+ setEndMiss(ca.isTerminalWithFailure());
+ slaCalc.setActualEnd(ca.getLastModifiedTime());
+ if (wf != null) {
+ if (wf.getEndTime() != null) {
+ if (slaCalc.getExpectedEnd() != null
+ && wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ setEndMiss(true);
+ }
+ slaCalc.setActualEnd(wf.getEndTime());
+ }
+ slaCalc.setActualStart(wf.getStartTime());
+ }
+ }
+ else {
+ if (wf != null) {
+ slaCalc.setActualStart(wf.getStartTime());
+ }
+ }
+ slaCalc.setJobStatus(ca.getStatusStr());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
new file mode 100644
index 0000000..b7f09d3
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLACoordActionJobHistoryXCommand.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLACoordActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+ CoordinatorActionBean cAction = null;
+
+ public SLACoordActionJobHistoryXCommand(String jobId) {
+ super(jobId);
+ }
+
+
+ protected void loadState() throws CommandException {
+ try {
+ cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_FOR_SLA, jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(cAction);
+ }
+
+ protected void updateSLASummary() throws CommandException {
+ try {
+ updateSLASummaryForCoordAction(cAction);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+
+ }
+
+ protected void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
+ String wrkflowId = bean.getExternalId();
+ if (wrkflowId != null) {
+ WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
+ WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
+ if (wrkflow != null) {
+ updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime(), bean.getStatusStr());
+ }
+ }
+ else{
+ updateSLASummary(bean.getId(), null, bean.getLastModifiedTime(), bean.getStatusStr());
+ }
+ }
+
+ @Override
+ protected boolean isJobEnded() {
+ return cAction.isTerminalStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
new file mode 100644
index 0000000..9b18606
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobEventXCommand.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.event.SLAEvent.EventStatus;
+import org.apache.oozie.client.event.SLAEvent.SLAStatus;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobEventXCommand extends XCommand<Void> {
+ private long lockTimeOut = 0 ;
+ JPAService jpaService = Services.get().get(JPAService.class);
+ SLACalcStatus slaCalc;
+ final static String SLA_LOCK_PREFIX = "sla_";
+ private boolean isEnded = false;
+ private boolean isEndMiss = false;
+
+ public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super("SLA.job.event", "SLA.job.event", 1);
+ this.slaCalc = slaCalc;
+ this.lockTimeOut = lockTimeOut;
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ protected boolean isReQueueRequired() {
+ return false;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return SLA_LOCK_PREFIX + slaCalc.getId();
+ }
+
+ protected long getLockTimeOut() {
+ return lockTimeOut;
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+
+ @Override
+ protected Void execute() throws CommandException {
+ updateJobInfo();
+ if (isEnded) {
+ processForEnd();
+ }
+ else {
+ processForRunning();
+ }
+ try {
+ writeToDB();
+ }
+ catch (XException e) {
+ throw new CommandException(e);
+ }
+ return null;
+ }
+
+ /**
+ * Verify job.
+ */
+ protected abstract void updateJobInfo();
+
+ /**
+ * Should alert.
+ *
+ * @param slaObj the sla obj
+ * @return true, if successful
+ */
+ private boolean shouldAlert(SLACalcStatus slaObj) {
+ return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
+ }
+
+ /**
+ * Queue event.
+ *
+ * @param event the event
+ */
+ private void queueEvent(SLACalcStatus event) {
+ Services.get().get(EventHandlerService.class).queueEvent(event);
+ }
+
+ /**
+ * Process duration sla.
+ *
+ * @param expected the expected
+ * @param actual the actual
+ * @param slaCalc the sla calc
+ */
+ private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
+ if (expected != -1) {
+ if (actual > expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ }
+ else if (actual <= expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ }
+ }
+
+
+ /**
+ * WriteSLA object to DB.
+ *
+ * @throws JPAExecutorException the JPA executor exception
+ */
+ private void writeToDB() throws JPAExecutorException {
+ byte eventProc = slaCalc.getEventProcessed();
+ // no more processing, no transfer to history set
+ if (slaCalc.getEventProcessed() >= 8) {
+ slaCalc.setEventProcessed(8);
+ }
+
+ SLASummaryBean slaSummaryBean = new SLASummaryBean();
+ slaSummaryBean.setId(slaCalc.getId());
+ slaSummaryBean.setEventProcessed(eventProc);
+ slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
+ slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
+ slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+ slaSummaryBean.setActualStart(slaCalc.getActualStart());
+ slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+ slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+ slaSummaryBean.setLastModifiedTime(new Date());
+
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+ slaSummaryBean);
+
+ LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(),
+ slaCalc.getEventProcessed(), slaCalc.getJobStatus());
+
+ }
+
+ /**
+ * Process for end.
+ */
+ private void processForEnd() {
+ byte eventProc = slaCalc.getEventProcessed();
+
+ LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd());
+ if (isEndMiss()) {
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ }
+ else {
+ slaCalc.setSLAStatus(SLAStatus.MET);
+ }
+ if (eventProc != 8 && slaCalc.getActualStart() != null) {
+ if ((eventProc & 1) == 0) {
+ if (slaCalc.getExpectedStart() != null) {
+ if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.START_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ }
+ }
+ slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
+ if (((eventProc >> 1) & 1) == 0) {
+ processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
+ }
+ }
+ if (eventProc != 8 && eventProc < 4) {
+ if (isEndMiss()) {
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.END_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ }
+ slaCalc.setEventProcessed(8);
+ }
+
+ /**
+ * Process for running.
+ */
+ private void processForRunning() {
+ byte eventProc = slaCalc.getEventProcessed();
+
+ if (eventProc != 8 && slaCalc.getActualStart() != null) {
+ slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
+ }
+ if (eventProc != 8 && (eventProc & 1) == 0) {
+ if (slaCalc.getExpectedStart() == null) {
+ eventProc++;
+ }
+ else if (slaCalc.getActualStart() != null) {
+ if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.START_MET);
+ }
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc++;
+ }
+ else if (slaCalc.getExpectedStart() != null
+ && slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
+ slaCalc.setEventStatus(EventStatus.START_MISS);
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc++;
+ }
+
+ }
+ if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+ if (slaCalc.getExpectedDuration() == -1) {
+ eventProc += 2;
+ }
+ else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) {
+ if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc += 2;
+ }
+ }
+ }
+ if (eventProc < 4) {
+ if (slaCalc.getExpectedEnd() != null) {
+ if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ if (shouldAlert(slaCalc)) {
+ queueEvent(new SLACalcStatus(slaCalc));
+ }
+ eventProc += 4;
+ }
+ }
+ else {
+ eventProc += 4;
+ }
+ }
+ slaCalc.setEventProcessed(eventProc);
+ }
+
+ public boolean isEnded() {
+ return isEnded;
+ }
+
+ public void setEnded(boolean isEnded) {
+ this.isEnded = isEnded;
+ }
+
+ public boolean isEndMiss() {
+ return isEndMiss;
+ }
+
+ public void setEndMiss(boolean isEndMiss) {
+ this.isEndMiss = isEndMiss;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
new file mode 100644
index 0000000..0b4045a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAJobHistoryXCommand.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import java.util.Date;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
+import org.apache.oozie.sla.SLASummaryBean;
+
+public abstract class SLAJobHistoryXCommand extends XCommand<Boolean> {
+
+ protected String jobId;
+
+ public SLAJobHistoryXCommand(String jobId) {
+ super("SLAJobHistoryXCommand", "SLAJobHistoryXCommand", 1);
+ this.jobId = jobId;
+
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ protected boolean isReQueueRequired() {
+ return false;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return SLAJobEventXCommand.SLA_LOCK_PREFIX + jobId;
+ }
+
+ protected long getLockTimeOut() {
+ return 0L;
+ }
+
+ protected Boolean execute() throws CommandException {
+ if (isJobEnded()) {
+ try {
+ updateSLASummary();
+ }
+ catch (XException e) {
+ throw new CommandException(e);
+ }
+ return true;
+ }
+ else {
+ LOG.debug("Job [{0}] is not finished", jobId);
+ }
+ return false;
+
+ }
+
+ /**
+ * Checks if is job ended.
+ *
+ * @return true, if is job ended
+ */
+ protected abstract boolean isJobEnded();
+
+ /**
+ * Update SLASummary
+ *
+ */
+ protected abstract void updateSLASummary() throws CommandException, XException;
+
+ /**
+ * Update sla summary.
+ *
+ * @param id the id
+ * @param startTime the start time
+ * @param endTime the end time
+ * @param status the status
+ * @throws JPAExecutorException the JPA executor exception
+ */
+ protected void updateSLASummary(String id, Date startTime, Date endTime, String status) throws JPAExecutorException {
+ SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
+ if (sla.getJobStatus().equals(status) && sla.getEventProcessed() == 8) {
+ LOG.debug("SLA job is already updated", sla.getId(), sla.getEventProcessed(), sla.getJobStatus());
+ return;
+ }
+ if (sla != null) {
+ sla.setActualStart(startTime);
+ sla.setActualEnd(endTime);
+ if (startTime != null && endTime != null) {
+ sla.setActualDuration(endTime.getTime() - startTime.getTime());
+ }
+ sla.setLastModifiedTime(new Date());
+ sla.setEventProcessed(8);
+ sla.setJobStatus(status);
+ SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
+ sla);
+ LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", sla.getId(),
+ sla.getEventProcessed(), sla.getJobStatus());
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
new file mode 100644
index 0000000..fef77ae
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobEventXCommand.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobEventXCommand extends SLAJobEventXCommand {
+ WorkflowActionBean wa;
+
+ public SLAWorkflowActionJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super(slaCalc, lockTimeOut);
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ wa = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_FOR_SLA, slaCalc.getId());
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wa);
+
+ }
+
+
+ @Override
+ protected void updateJobInfo() {
+ if (wa.getEndTime() != null) {
+ setEnded(true);
+ if (wa.isTerminalWithFailure() || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ setEndMiss(true);
+ }
+ }
+ slaCalc.setActualStart(wa.getStartTime());
+ slaCalc.setActualEnd(wa.getEndTime());
+ slaCalc.setJobStatus(wa.getStatusStr());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
new file mode 100644
index 0000000..7dc4a3c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowActionJobHistoryXCommand.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowActionJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+ WorkflowActionBean wfAction = null;
+
+ public SLAWorkflowActionJobHistoryXCommand(String jobId) {
+ super(jobId);
+ }
+
+ protected void loadState() throws CommandException {
+
+ try {
+ wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wfAction);
+ }
+
+ protected void updateSLASummary() throws XException {
+ updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
+
+ }
+
+ @Override
+ protected boolean isJobEnded() {
+ return wfAction.isComplete() || wfAction.isTerminalWithFailure();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
new file mode 100644
index 0000000..9a72617
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobEventXCommand.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.sla.SLACalcStatus;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobEventXCommand extends SLAJobEventXCommand {
+ WorkflowJobBean wf;
+
+ public SLAWorkflowJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
+ super(slaCalc, lockTimeOut);
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, slaCalc.getId());
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wf);
+
+ }
+
+
+ @Override
+ protected void updateJobInfo() {
+ if (wf.inTerminalState()) {
+ setEnded(true);
+ if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
+ || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ setEndMiss(true);
+ }
+ slaCalc.setActualEnd(wf.getEndTime());
+ }
+ slaCalc.setActualStart(wf.getStartTime());
+ slaCalc.setJobStatus(wf.getStatusStr());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
new file mode 100644
index 0000000..79e45ee
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/sla/SLAWorkflowJobHistoryXCommand.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.sla;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.util.LogUtils;
+
+public class SLAWorkflowJobHistoryXCommand extends SLAJobHistoryXCommand {
+
+ WorkflowJobBean wfJob = null;
+
+ public SLAWorkflowJobHistoryXCommand(String jobId) {
+ super(jobId);
+ }
+
+
+ protected void loadState() throws CommandException {
+ try {
+ wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(wfJob);
+ }
+
+ protected void updateSLASummary() throws XException {
+ updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
+ }
+
+ @Override
+ protected boolean isJobEnded() {
+ return wfJob.inTerminalState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 8a5b997..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * JPAExecutor to get attributes of CoordinatorActionBean required by SLAService on restart
- */
-public class CoordActionGetForSLAJPAExecutor implements JPAExecutor<CoordinatorActionBean> {
-
- private String coordActionId;
-
- public CoordActionGetForSLAJPAExecutor(String coordActionId) {
- ParamChecker.notNull(coordActionId, "coordActionId");
- this.coordActionId = coordActionId;
- }
-
- @Override
- public String getName() {
- return "CoordActionGetForSLAJPAExecutor";
- }
-
- @Override
- public CoordinatorActionBean execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_COORD_ACTION_FOR_SLA");
- q.setParameter("id", coordActionId);
- Object[] obj = (Object[]) q.getSingleResult();
- CoordinatorActionBean caBean = getBeanForRunningCoordAction(obj);
- return caBean;
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
-
- }
-
- private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
- CoordinatorActionBean bean = new CoordinatorActionBean();
- if (arr[0] != null) {
- bean.setId((String) arr[0]);
- }
- if (arr[1] != null) {
- bean.setJobId((String) arr[1]);
- }
- if (arr[2] != null) {
- bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
- }
- if (arr[3] != null) {
- bean.setExternalId((String) arr[3]);
- }
- if (arr[4] != null) {
- bean.setLastModifiedTime(DateUtils.toDate((Timestamp)arr[4]));
- }
- return bean;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index 79ec28c..c0e6c19 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -59,7 +59,8 @@ public class CoordActionQueryExecutor extends
GET_TERMINATED_ACTION_IDS_FOR_DATES,
GET_ACTIVE_ACTIONS_FOR_DATES,
GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
- GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN
+ GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN,
+ GET_COORD_ACTION_FOR_SLA
};
private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -177,6 +178,7 @@ public class CoordActionQueryExecutor extends
switch (caQuery) {
case GET_COORD_ACTION:
case GET_COORD_ACTION_STATUS:
+ case GET_COORD_ACTION_FOR_SLA:
query.setParameter("id", parameters[0]);
break;
case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
@@ -330,6 +332,15 @@ public class CoordActionQueryExecutor extends
bean.setExternalId((String) arr[3]);
bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
break;
+ case GET_COORD_ACTION_FOR_SLA:
+ arr = (Object[]) ret;
+ bean = new CoordinatorActionBean();
+ bean.setId((String) arr[0]);
+ bean.setJobId((String) arr[1]);
+ bean.setStatusStr((String) arr[2]);
+ bean.setExternalId((String) arr[3]);
+ bean.setLastModifiedTime((Timestamp) arr[4]);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
index 6663162..6ff9df8 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java
@@ -37,7 +37,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
public enum SLASummaryQuery {
UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
- UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
UPDATE_SLA_SUMMARY_ALL,
UPDATE_SLA_SUMMARY_EVENTPROCESSED,
UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
@@ -72,14 +71,6 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu
query.setParameter("actualEndTS", bean.getActualEndTimestamp());
query.setParameter("actualDuration", bean.getActualDuration());
break;
- case UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES:
- query.setParameter("jobId", bean.getId());
- query.setParameter("eventProcessed", bean.getEventProcessed());
- query.setParameter("actualStartTS", bean.getActualStartTimestamp());
- query.setParameter("actualEndTS", bean.getActualEndTimestamp());
- query.setParameter("actualDuration", bean.getActualDuration());
- query.setParameter("lastModifiedTS", bean.getLastModifiedTimestamp());
- break;
case UPDATE_SLA_SUMMARY_ALL:
query.setParameter("appName", bean.getAppName());
query.setParameter("appType", bean.getAppType().toString());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
deleted file mode 100644
index 280294b..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow action bean for sla service
- */
-public class WorkflowActionGetForSLAJPAExecutor implements JPAExecutor<WorkflowActionBean> {
-
- private String wfActionId;
-
- public WorkflowActionGetForSLAJPAExecutor(String wfActionId) {
- ParamChecker.notNull(wfActionId, "wfActionId");
- this.wfActionId = wfActionId;
- }
-
- @Override
- public String getName() {
- return "WorkflowActionGetForSLAJPAExecutor";
- }
-
- @Override
- public WorkflowActionBean execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_ACTION_FOR_SLA");
- q.setParameter("id", wfActionId);
- Object[] obj = (Object[]) q.getSingleResult();
- return getBeanFromArray(obj);
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
- private WorkflowActionBean getBeanFromArray(Object[] arr) {
- WorkflowActionBean wab = new WorkflowActionBean();
- if (arr[0] != null) {
- wab.setId((String) arr[0]);
- }
- if (arr[1] != null) {
- wab.setStatus(WorkflowAction.Status.valueOf((String) arr[1]));
- }
- if (arr[2] != null) {
- wab.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
- }
- if (arr[3] != null) {
- wab.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
- }
- return wab;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 078fd40..f01f090 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -57,7 +57,8 @@ public class WorkflowActionQueryExecutor extends
GET_ACTION_COMPLETED,
GET_RUNNING_ACTIONS,
GET_PENDING_ACTIONS,
- GET_ACTIONS_FOR_WORKFLOW_RERUN
+ GET_ACTIONS_FOR_WORKFLOW_RERUN,
+ GET_ACTION_FOR_SLA
};
private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor();
@@ -202,6 +203,7 @@ public class WorkflowActionQueryExecutor extends
case GET_ACTION_CHECK:
case GET_ACTION_END:
case GET_ACTION_COMPLETED:
+ case GET_ACTION_FOR_SLA:
query.setParameter("id", parameters[0]);
break;
case GET_RUNNING_ACTIONS:
@@ -363,6 +365,15 @@ public class WorkflowActionQueryExecutor extends
bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
bean.setType((String) arr[4]);
break;
+ case GET_ACTION_FOR_SLA:
+ bean = new WorkflowActionBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setStatusStr((String) arr[1]);
+ bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+ bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
deleted file mode 100644
index 774766f..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.ParamChecker;
-
-/**
- * Retrieve the workflow job bean for sla service
- */
-public class WorkflowJobGetForSLAJPAExecutor implements JPAExecutor<WorkflowJobBean> {
-
- private String wfJobId;
-
- public WorkflowJobGetForSLAJPAExecutor(String wfJobId) {
- ParamChecker.notNull(wfJobId, "wfJobId");
- this.wfJobId = wfJobId;
- }
-
- @Override
- public String getName() {
- return "WorkflowJobGetForSLAJPAExecutor";
- }
-
- @Override
- public WorkflowJobBean execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_WORKFLOW_FOR_SLA");
- q.setParameter("id", wfJobId);
- Object[] obj = (Object[]) q.getSingleResult();
- return getBeanFromArray(obj);
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
- }
-
- private WorkflowJobBean getBeanFromArray(Object[] arr) {
- WorkflowJobBean wjb = new WorkflowJobBean();
- if (arr[0] != null) {
- wjb.setId((String) arr[0]);
- }
- if (arr[1] != null) {
- wjb.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
- }
- if (arr[2] != null) {
- wjb.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
- }
- if (arr[3] != null) {
- wjb.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
- }
- return wjb;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index ce108d5..13fa54d 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -28,7 +28,6 @@ import javax.persistence.Query;
import org.apache.oozie.BinaryBlob;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.StringBlob;
-import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -60,7 +59,8 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
GET_WORKFLOW_RESUME,
GET_WORKFLOW_STATUS,
GET_WORKFLOWS_PARENT_COORD_RERUN,
- GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN
+ GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN,
+ GET_WORKFLOW_FOR_SLA
};
private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor();
@@ -171,6 +171,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
case GET_WORKFLOW_KILL:
case GET_WORKFLOW_RESUME:
case GET_WORKFLOW_STATUS:
+ case GET_WORKFLOW_FOR_SLA:
query.setParameter("id", parameters[0]);
break;
case GET_WORKFLOWS_PARENT_COORD_RERUN:
@@ -330,6 +331,14 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
bean.setId((String) arr[0]);
bean.setParentId((String) arr[1]);
break;
+ case GET_WORKFLOW_FOR_SLA:
+ bean = new WorkflowJobBean();
+ arr = (Object[]) ret;
+ bean.setId((String) arr[0]);
+ bean.setStatusStr((String) arr[1]);
+ bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+ bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
index 4246764..9d4dcd9 100644
--- a/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
+++ b/core/src/main/java/org/apache/oozie/service/ConfigurationService.java
@@ -544,12 +544,19 @@ public class ConfigurationService implements Service, Instrumentable {
}
public static long getLong(String name) {
+ return getLong(name, ConfigUtils.LONG_DEFAULT);
+ }
+
+ public static long getLong(String name, long defultValue) {
Configuration conf = Services.get().getConf();
- return getLong(conf, name);
+ return getLong(conf, name, defultValue);
}
public static long getLong(Configuration conf, String name) {
- return conf.getLong(name, ConfigUtils.LONG_DEFAULT);
+ return getLong(conf, name, ConfigUtils.LONG_DEFAULT);
+ }
+ public static long getLong(Configuration conf, String name, long defultValue) {
+ return conf.getLong(name, defultValue);
}
public static Class<?>[] getClasses(String name) {
@@ -590,4 +597,5 @@ public class ConfigurationService implements Service, Instrumentable {
Configuration conf = Services.get().getConf();
return getPassword(conf, name, defaultValue);
}
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 5c0cfd9..3a76dfe 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -25,11 +25,6 @@ import java.util.Map;
import org.apache.oozie.AppType;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.event.SLAEvent;
-import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.service.JobsConcurrencyService;
-import org.apache.oozie.service.MemoryLocksService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XLog;
@@ -49,7 +44,6 @@ public class SLACalcStatus extends SLAEvent {
private long actualDuration = -1;
private Date lastModifiedTime;
private byte eventProcessed;
- private LockToken lock;
private XLog LOG;
@@ -293,53 +287,5 @@ public class SLACalcStatus extends SLAEvent {
public String getEntityKey() {
return SLA_ENTITYKEY_PREFIX + this.getId();
}
- /**
- * Obtain an exclusive lock on the {link #getEntityKey}.
- * <p>
- * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
- *
- * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
- */
- public void acquireLock() throws InterruptedException {
- // only get ZK lock when multiple servers running
- if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
- lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
- if (lock == null) {
- LOG.debug("Could not aquire lock for [{0}]", getEntityKey());
- }
- else {
- LOG.debug("Acquired lock for [{0}]", getEntityKey());
- }
- }
- else {
- lock = new DummyToken();
- }
- }
-
- private static class DummyToken implements LockToken {
- @Override
- public void release() {
- }
- }
-
- public boolean isLocked() {
- boolean locked = false;
- if(lock != null) {
- locked = true;
- }
- return locked;
- }
-
- public void releaseLock(){
- if (lock != null) {
- lock.release();
- lock = null;
- LOG.debug("Released lock for [{0}]", getEntityKey());
- }
- }
-
- public long getLockTimeOut() {
- return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000);
- }
}