You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/04/10 02:51:45 UTC
svn commit: r1466307 [1/2] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/event/
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/client/rest/ core/src/main/java/...
Author: mona
Date: Wed Apr 10 00:51:43 2013
New Revision: 1466307
URL: http://svn.apache.org/r1466307
Log:
OOZIE-1209 Event generation and handling for workflow and coordinator (mona)
Added:
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/
oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/
oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/
oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/
oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/
oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java
oozie/trunk/core/src/test/java/org/apache/oozie/event/
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
Modified:
oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
oozie/trunk/release-log.txt
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java Wed Apr 10 00:51:43 2013
@@ -21,6 +21,9 @@ import java.util.Date;
/**
* Bean that represents a SLA event
+ * @deprecated This interface has been deprecated by the interface
+ * SLAEvent in oozie.client.event package which works with the
+ * new SLA Calculator system OOZIE-1244
*/
public interface SLAEvent {
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event;
+
+/**
+ * This interface defines an Event that can be generated via
+ * Job status changes or SLA related events
+ */
+public interface Event {
+
+ /**
+ * Events will be messages, broadly of type - Job related or SLA related
+ *
+ */
+ public static enum MessageType {
+ JOB,
+ SLA
+ }
+
+ /**
+ * Events carry the associated app-type or job-type to enable toggling on/off
+ * events generated only for specific app-types or filtering on receiving side
+ */
+ public static enum AppType {
+ WORKFLOW_JOB,
+ WORKFLOW_ACTION,
+ COORDINATOR_JOB,
+ COORDINATOR_ACTION,
+ BUNDLE_JOB
+ }
+
+ /**
+ * Get the AppType of the event
+ * @return AppType
+ */
+ public AppType getAppType();
+
+ /**
+ * Get the MessageType of the event
+ * @return MessageType
+ */
+ public MessageType getMsgType();
+
+ /**
+ * Get the app-name of the job generating this event
+ * @return String app-name
+ */
+ public String getAppName();
+
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import org.apache.oozie.client.event.Event;
+
+/**
+ * An abstract implementation of the Event interface, related to
+ * notification events after job status changes
+ */
+public abstract class JobEvent implements Event, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Coarse-grained status of the job event
+ *
+ */
+ public static enum EventStatus {
+ WAITING, STARTED, SUCCESS, SUSPEND, FAILURE
+ }
+
+ private AppType appType;
+ private MessageType msgType;
+ private String id;
+ private String parentId;
+ private EventStatus eventStatus;
+ private String appName;
+ private String user;
+ private Date startTime;
+ private Date endTime;
+
+ public JobEvent(AppType appType, String id, String parentId, String user, String appName) {
+ this.appType = appType;
+ this.msgType = MessageType.JOB;
+ this.id = id;
+ this.parentId = parentId;
+ this.user = user;
+ this.appName = appName;
+ }
+
+ @Override
+ public AppType getAppType() {
+ return appType;
+ }
+
+ public void setAppType(AppType type) {
+ appType = type;
+ }
+
+ @Override
+ public MessageType getMsgType() {
+ return msgType;
+ }
+
+ public void setMsgType(MessageType type) {
+ msgType = type;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getParentId() {
+ return parentId;
+ }
+
+ public void setParentId(String id) {
+ parentId = id;
+ }
+
+ public EventStatus getEventStatus() {
+ return eventStatus;
+ }
+
+ public void setEventStatus(EventStatus eStatus) {
+ eventStatus = eStatus;
+ }
+
+ @Override
+ public String getAppName() {
+ return appName;
+ }
+
+ public void setAppName(String name) {
+ appName = name;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String euser) {
+ user = euser;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date time) {
+ startTime = time;
+ }
+
+ public Date getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Date time) {
+ endTime = time;
+ }
+
+ @Override
+ public String toString() {
+ return "ID: " + getId() + ", AppType: " + getAppType() + ", Appname: " + getAppName() + ", Status: "
+ + getEventStatus();
+ }
+
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event;
+
+import java.util.Date;
+
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+
+/**
+ * A sub-class of the Event interface, related to
+ * notification events after SLA mets/misses
+ */
+public interface SLAEvent extends Event {
+
+ public static enum EventType {
+ START_MET,
+ START_MISS,
+ END_MET,
+ END_MISS,
+ DURATION_MET,
+ DURATION_MISS
+ }
+
+ /**
+ * Get the SLA event-type
+ * @return SLAEvent.EventType
+ */
+ public EventType getEventType();
+
+ /**
+ * Get the job id
+ * @return String id
+ */
+ public String getId();
+
+ /**
+ * Get the parent job id
+ * @return String id
+ */
+ public String getParentId();
+
+ /**
+ * Get the actual job status
+ * @return EventStatus status
+ */
+ public EventStatus getJobStatus();
+
+ /**
+ * Get the job-id
+ * @return String job-id
+ */
+ public String getJobId();
+
+ /**
+ * Get the sla event sequence-id
+ * @return String sequence-id
+ */
+ public String getSequenceId();
+
+ /**
+ * Get the expected start-time for this job
+ * @return Date expected start-time
+ */
+ public Date getExpectedStart();
+
+ /**
+ * Get the actual start-time for this job
+ * @return Date actual start-time
+ */
+ public Date getActualStart();
+
+ /**
+ * Get the expected end-time for this job
+ * @return Date expected end-time
+ */
+ public Date getExpectedEnd();
+
+ /**
+ * Get the actual end-time for this job
+ * @return Date actual end-time
+ */
+ public Date getActualEnd();
+
+ /**
+ * Get the expected duration for this job
+ * @return Date expected duration
+ */
+ public long getExpectedDuration();
+
+ /**
+ * Get the actual duration for this job
+ * @return Date actual duration
+ */
+ public long getActualDuration();
+
+ /**
+ * Get the sla notification-message
+ * @return String notification-message
+ */
+ public String getNotificationMsg();
+
+ /**
+ * Get the SLA alert-contact
+ * @return String alert-contact
+ */
+ public String getAlertContact();
+
+ /**
+ * Get the SLA dev-contact
+ * @return String dev-contact
+ */
+ public String getDevContact();
+
+ /**
+ * Get the SLA qa-contact
+ * @return String qa-contact
+ */
+ public String getQaContact();
+
+ /**
+ * Get the SLA alert-contact
+ * @return String alert-contact
+ */
+ public String getSeContact();
+
+ /**
+ * Get the SLA alert-frequency
+ * @return String alert-frequency
+ */
+ public String getAlertFrequency();
+
+ /**
+ * Get the SLA alert-percentage
+ * @return String alert-percentage
+ */
+ public String getAlertPercentage();
+
+ /**
+ * Get the dependent upstream apps
+ * @return String upstream-apps
+ */
+ public String getUpstreamApps();
+
+ /**
+ * Get job related data or configuration
+ * @return String job-data
+ */
+ public String getJobData();
+
+ /**
+ * Get last modified time of this sla event
+ * @return Date last-modified-time
+ */
+ public Date getLastModified();
+
+ /**
+ * Get the user for this job sla
+ * @return String user
+ */
+ public String getUser();
+
+ /**
+ * Get the nominal time for this job under sla
+ * @return Date nominalTime
+ */
+ public Date getNominalTime();
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Wed Apr 10 00:51:43 2013
@@ -77,19 +77,19 @@ import org.apache.openjpa.persistence.jd
// Select query used by ActionInfo command
@NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
// Select Query used by Timeout command
- @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending from CoordinatorActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
// Select query used by InputCheck command
@NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
// Select query used by CoordActionUpdate command
- @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.externalId = :externalId"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
// Select query used by Check command
- @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
// Select query used by Start command
- @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode from CoordinatorActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
- @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
- @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
@NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"),
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java Wed Apr 10 00:51:43 2013
@@ -73,7 +73,9 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_COORD_JOBS_WITH_PARENT_ID", query = "select w.id from CoordinatorJobBean w where w.bundleId = :parentId"),
- @NamedQuery(name = "GET_COORD_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from CoordinatorJobBean w where w.bundleId = :parentId and (w.status NOT IN ('SUCCEEDED', 'FAILED', 'KILLED', 'DONEWITHERROR') OR w.lastModifiedTimestamp >= :lastModTime)")
+ @NamedQuery(name = "GET_COORD_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from CoordinatorJobBean w where w.bundleId = :parentId and (w.status NOT IN ('SUCCEEDED', 'FAILED', 'KILLED', 'DONEWITHERROR') OR w.lastModifiedTimestamp >= :lastModTime)"),
+
+ @NamedQuery(name = "GET_COORD_JOB_FOR_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id")
})
public class CoordinatorJobBean extends JsonCoordinatorJob implements Writable {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java Wed Apr 10 00:51:43 2013
@@ -300,7 +300,7 @@ public class JsonCoordinatorAction imple
@Override
public String toString() {
- return MessageFormat.format("WorkflowAction name[{0}] status[{1}]",
+ return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]",
getId(), getStatus());
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java Wed Apr 10 00:51:43 2013
@@ -21,6 +21,7 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.FaultInjection;
import org.apache.oozie.XException;
import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.Services;
@@ -77,6 +78,7 @@ public abstract class XCommand<T> implem
protected Instrumentation instrumentation;
protected XLog.Info logInfo;
+ protected static EventHandlerService eventService;
/**
* Create a command.
@@ -93,6 +95,7 @@ public abstract class XCommand<T> implem
createdTime = System.currentTimeMillis();
logInfo = new XLog.Info();
instrumentation = Services.get().get(InstrumentationService.class).get();
+ eventService = Services.get().get(EventHandlerService.class);
}
/**
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Wed Apr 10 00:51:43 2013
@@ -23,10 +23,12 @@ import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -42,6 +44,7 @@ import org.apache.oozie.command.CommandE
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
/**
@@ -69,7 +72,6 @@ public class CoordActionCheckXCommand ex
try {
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetJPAExecutor(coordAction.getExternalId()));
-
Status slaStatus = null;
if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) {
@@ -116,6 +118,11 @@ public class CoordActionCheckXCommand ex
}
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
+ if (EventHandlerService.isEventsConfigured()) {
+ CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+ coordAction.getJobId()));
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ }
}
catch (XException ex) {
LOG.warn("CoordActionCheckCommand Failed ", ex);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Wed Apr 10 00:51:43 2013
@@ -43,6 +43,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
@@ -164,7 +165,7 @@ public class CoordActionInputCheckXComma
}
else {
if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
- queue(new CoordActionTimeOutXCommand(coordAction));
+ queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
// Let CoordPushDependencyCheckXCommand queue the timeout
@@ -176,7 +177,8 @@ public class CoordActionInputCheckXComma
if (isTimeout(currentTime)) {
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
- Services.get().get(CallableQueueService.class).queue(new CoordActionTimeOutXCommand(coordAction));
+ Services.get().get(CallableQueueService.class)
+ .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
@@ -210,6 +212,11 @@ public class CoordActionInputCheckXComma
try {
if (isChangeInDependency) {
jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
+ if (EventHandlerService.isEventsConfigured()
+ && coordAction.getStatus() != CoordinatorAction.Status.READY) {
+ //since event is not to be generated unless action RUNNING via StartX
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ }
}
else {
jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java Wed Apr 10 00:51:43 2013
@@ -109,7 +109,8 @@ public class CoordActionReadyXCommand ex
// change state of action to SUBMITTED
action.setStatus(CoordinatorAction.Status.SUBMITTED);
// queue action to start action
- queue(new CoordActionStartXCommand(action.getId(), user, authToken, action.getJobId()), 100);
+ queue(new CoordActionStartXCommand(action.getId(), user, coordJob.getAppName(), authToken,
+ action.getJobId()), 100);
try {
jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(action));
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Wed Apr 10 00:51:43 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.service.DagEngineService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.JobUtils;
@@ -43,6 +44,7 @@ import org.apache.oozie.client.SLAEvent.
import org.apache.oozie.client.SLAEvent.Status;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.jdom.Element;
@@ -65,6 +67,7 @@ public class CoordActionStartXCommand ex
private final XLog log = getLog();
private String actionId = null;
private String user = null;
+ private String appName = null;
private String authToken = null;
private CoordinatorActionBean coordAction = null;
private JPAService jpaService = null;
@@ -72,11 +75,12 @@ public class CoordActionStartXCommand ex
private List<JsonBean> updateList = new ArrayList<JsonBean>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
- public CoordActionStartXCommand(String id, String user, String token, String jobId) {
+ public CoordActionStartXCommand(String id, String user, String appName, String token, String jobId) {
//super("coord_action_start", "coord_action_start", 1, XLog.OPS);
super("coord_action_start", "coord_action_start", 1);
this.actionId = ParamChecker.notEmpty(id, "id");
this.user = ParamChecker.notEmpty(user, "user");
+ this.appName = ParamChecker.notEmpty(appName, "appName");
this.authToken = ParamChecker.notEmpty(token, "token");
this.jobId = jobId;
}
@@ -190,6 +194,9 @@ public class CoordActionStartXCommand ex
updateList.add(coordAction);
try {
jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
+ if (EventHandlerService.isEventsConfigured()) {
+ generateEvent(coordAction, user, appName);
+ }
}
catch (JPAExecutorException je) {
throw new CommandException(je);
@@ -243,6 +250,9 @@ public class CoordActionStartXCommand ex
try {
// call JPAExecutor to do the bulk writes
jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
+ if (EventHandlerService.isEventsConfigured()) {
+ generateEvent(coordAction, user, appName);
+ }
}
catch (JPAExecutorException je) {
throw new CommandException(je);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java Wed Apr 10 00:51:43 2013
@@ -27,6 +27,7 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.executor.jpa.CoordActionGetForTimeoutJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.LogUtils;
@@ -37,12 +38,15 @@ import org.apache.oozie.util.ParamChecke
*/
public class CoordActionTimeOutXCommand extends CoordinatorXCommand<Void> {
private CoordinatorActionBean actionBean;
+ private String user;
+ private String appName;
private JPAService jpaService = null;
- public CoordActionTimeOutXCommand(CoordinatorActionBean actionBean) {
+ public CoordActionTimeOutXCommand(CoordinatorActionBean actionBean, String user, String appName) {
super("coord_action_timeout", "coord_action_timeout", 1);
- ParamChecker.notNull(actionBean, "ActionBean");
- this.actionBean = actionBean;
+ this.actionBean = ParamChecker.notNull(actionBean, "ActionBean");
+ this.user = ParamChecker.notEmpty(user, "user");
+ this.appName = ParamChecker.notEmpty(appName, "appName");
}
/* (non-Javadoc)
@@ -52,10 +56,13 @@ public class CoordActionTimeOutXCommand
protected Void execute() throws CommandException {
if (actionBean.getStatus() == CoordinatorAction.Status.WAITING) {
actionBean.setStatus(CoordinatorAction.Status.TIMEDOUT);
- queue(new CoordActionNotificationXCommand(actionBean), 100);
- actionBean.setLastModifiedTime(new Date());
try {
+ queue(new CoordActionNotificationXCommand(actionBean), 100);
+ actionBean.setLastModifiedTime(new Date());
jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(actionBean));
+ if (EventHandlerService.isEventsConfigured()) {
+ generateEvent(actionBean, user, appName);
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Wed Apr 10 00:51:43 2013
@@ -22,10 +22,12 @@ import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.LogUtils;
@@ -39,6 +41,7 @@ import org.apache.oozie.command.CommandE
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
@@ -64,7 +67,6 @@ public class CoordActionUpdateXCommand e
protected Void execute() throws CommandException {
try {
LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
-
Status slaStatus = null;
CoordinatorAction.Status preCoordStatus = coordAction.getStatus();
if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
@@ -131,6 +133,11 @@ public class CoordActionUpdateXCommand e
}
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
+ if (EventHandlerService.isEventsConfigured()) {
+ CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(coordAction
+ .getJobId()));
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ }
LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Wed Apr 10 00:51:43 2013
@@ -42,6 +42,7 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
@@ -111,6 +112,9 @@ public class CoordMaterializeTransitionX
for (JsonBean actionBean : insertList) {
if (actionBean instanceof CoordinatorActionBean) {
CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
+ if (EventHandlerService.isEventsConfigured()) {
+ CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ }
if (coordAction.getPushMissingDependencies() != null) {
// TODO: Delay in catchup mode?
queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
@@ -259,7 +263,7 @@ public class CoordMaterializeTransitionX
coordJob.resetPending();
}
catch (Exception e) {
- LOG.error("Excepion thrown :", e);
+ LOG.error("Exception thrown :", e);
throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
}
cron.stop();
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Wed Apr 10 00:51:43 2013
@@ -42,6 +42,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.RecoveryService;
import org.apache.oozie.service.Service;
@@ -140,7 +141,7 @@ public class CoordPushDependencyCheckXCo
// Checking for timeout
timeout = isTimeout();
if (timeout) {
- queue(new CoordActionTimeOutXCommand(coordAction));
+ queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
@@ -164,7 +165,7 @@ public class CoordPushDependencyCheckXCo
if (isTimeout()) {
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
- callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction));
+ callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
}
else if (coordAction.getMissingDependencies() != null
@@ -252,6 +253,11 @@ public class CoordPushDependencyCheckXCo
try {
if (isChangeInDependency) {
jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+ if (EventHandlerService.isEventsConfigured()
+ && coordAction.getStatus() != CoordinatorAction.Status.READY) {
+ //since event is not to be generated unless action RUNNING via StartX
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ }
}
else {
jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Wed Apr 10 00:51:43 2013
@@ -35,6 +35,7 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
@@ -45,6 +46,7 @@ import org.apache.oozie.coord.CoordUtils
import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -404,6 +406,14 @@ public class CoordRerunXCommand extends
public void performWrites() throws CommandException {
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ if (EventHandlerService.isEventsConfigured()) {
+ for (JsonBean bean : updateList) {
+ if (bean instanceof CoordinatorActionBean) {
+ CoordinatorXCommand.generateEvent((CoordinatorActionBean) bean, coordJob.getUser(),
+ coordJob.getAppName());
+ }
+ }
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java Wed Apr 10 00:51:43 2013
@@ -17,7 +17,14 @@
*/
package org.apache.oozie.command.coord;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.event.Event.AppType;
import org.apache.oozie.command.XCommand;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.util.ParamChecker;
/**
* Abstract coordinator command class derived from XCommand
@@ -47,4 +54,38 @@ public abstract class CoordinatorXComman
super(name, type, priority, dryrun);
}
+ protected static void generateEvent(CoordinatorActionBean coordAction, String user, String appName) {
+ if (eventService.checkSupportedApptype(AppType.COORDINATOR_ACTION.name())) {
+ ParamChecker.notNull(coordAction, "coordAction");
+ ParamChecker.notNull(user, "user");
+ ParamChecker.notNull(appName, "appName");
+ String missDep = coordAction.getMissingDependencies();
+ if (missDep != null && missDep.length() > 0) {
+ missDep = missDep.split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
+ }
+ String pushMissDep = coordAction.getPushMissingDependencies();
+ if (pushMissDep != null && pushMissDep.length() > 0) {
+ pushMissDep = pushMissDep.split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
+ }
+ String deps = missDep == null ? (pushMissDep == null ? null : pushMissDep) : (pushMissDep == null ? missDep
+ : missDep + CoordELFunctions.INSTANCE_SEPARATOR + pushMissDep);
+ CoordinatorActionEvent event = new CoordinatorActionEvent(coordAction.getId(), coordAction.getJobId(),
+ coordAction.getStatus(), user, appName, coordAction.getNominalTime(), coordAction.getCreatedTime(),
+ deps);
+ event.setErrorCode(coordAction.getErrorCode());
+ event.setErrorMessage(coordAction.getErrorMessage());
+ eventService.queueEvent(event);
+ }
+ }
+
+ protected void generateEvent(CoordinatorJobBean coordJob) {
+ if (eventService.checkSupportedApptype(AppType.COORDINATOR_JOB.name())) {
+ ParamChecker.notNull(coordJob, "coordJob");
+ CoordinatorJobEvent event = new CoordinatorJobEvent(coordJob.getId(), coordJob.getBundleId(),
+ coordJob.getStatus(), coordJob.getUser(), coordJob.getAppName(), coordJob.getStartTime(),
+ coordJob.getEndTime());
+ eventService.queueEvent(event);
+ }
+ }
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Apr 10 00:51:43 2013
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.workflow.WorkflowException;
@@ -155,6 +156,9 @@ public class KillXCommand extends Workfl
wfJob.setLastModifiedTime(new Date());
updateList.add(wfJob);
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ if (EventHandlerService.isEventsConfigured()) {
+ generateEvent(wfJob, null, null);
+ }
queue(new NotificationXCommand(wfJob));
}
catch (JPAExecutorException e) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Wed Apr 10 00:51:43 2013
@@ -41,6 +41,7 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -74,7 +75,6 @@ public class ResumeXCommand extends Work
workflow.setWorkflowInstance(wfInstance);
workflow.setStatus(WorkflowJob.Status.RUNNING);
-
//for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) {
for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) {
@@ -130,6 +130,9 @@ public class ResumeXCommand extends Work
workflow.setLastModifiedTime(new Date());
updateList.add(workflow);
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+ if (EventHandlerService.isEventsConfigured()) {
+ generateEvent(workflow);
+ }
queue(new NotificationXCommand(workflow));
}
return null;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Apr 10 00:51:43 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
@@ -311,6 +312,14 @@ public class SignalXCommand extends Work
updateList.add(wfJob);
// call JPAExecutor to do the bulk writes
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ if (EventHandlerService.isEventsConfigured()) {
+ if (wfAction != null) {
+ generateEvent(wfJob, wfAction.getErrorCode(), wfAction.getErrorMessage());
+ }
+ else {
+ generateEvent(wfJob, null, null);
+ }
+ }
}
catch (JPAExecutorException je) {
throw new CommandException(je);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Wed Apr 10 00:51:43 2013
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -100,6 +101,9 @@ public class SuspendXCommand extends Wor
workflow.setWorkflowInstance(wfInstance);
setPendingFalseForActions(jpaService, id, actionId, updateList);
+ if (EventHandlerService.isEventsConfigured()) {
+ generateEvent(workflow);
+ }
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java Wed Apr 10 00:51:43 2013
@@ -17,7 +17,13 @@
*/
package org.apache.oozie.command.wf;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.event.Event.AppType;
import org.apache.oozie.command.XCommand;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.util.ParamChecker;
/**
* Abstract coordinator command class derived from XCommand
@@ -48,4 +54,30 @@ public abstract class WorkflowXCommand<T
super(name, type, priority, dryrun);
}
+ protected static void generateEvent(WorkflowJobBean wfJob, String errorCode, String errorMsg) {
+ if (eventService.checkSupportedApptype(AppType.WORKFLOW_JOB.name())) {
+ ParamChecker.notNull(wfJob, "wfJob");
+ WorkflowJobEvent event = new WorkflowJobEvent(wfJob.getId(), wfJob.getParentId(), wfJob.getStatus(),
+ wfJob.getUser(), wfJob.getAppName(), wfJob.getStartTime(), wfJob.getEndTime());
+ event.setErrorCode(errorCode);
+ event.setErrorMessage(errorMsg);
+ eventService.queueEvent(event);
+ }
+ }
+
+ protected static void generateEvent(WorkflowJobBean wfJob) {
+ generateEvent(wfJob, null, null);
+ }
+
+ protected void generateEvent(WorkflowActionBean wfAction, String wfUser) {
+ if (eventService.checkSupportedApptype(AppType.WORKFLOW_ACTION.name())) {
+ ParamChecker.notNull(wfAction, "wfAction");
+ WorkflowActionEvent event = new WorkflowActionEvent(wfAction.getId(), wfAction.getJobId(),
+ wfAction.getStatus(), wfUser, wfAction.getName(), wfAction.getStartTime(), wfAction.getEndTime());
+ event.setErrorCode(wfAction.getErrorCode());
+ event.setErrorMessage(wfAction.getErrorMessage());
+ eventService.queueEvent(event);
+ }
+ }
+
}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Bundle Jobs
+ *
+ */
+public class BundleJobEvent extends JobEvent {
+
+ private BundleJob.Status status;
+
+ public BundleJobEvent(String id, BundleJob.Status status, String user, String appName, Date startTime, Date endTime) {
+ super(AppType.BUNDLE_JOB, id, null, user, appName); //parentId is null
+ setStatus(status);
+ setStartTime(startTime);
+ setEndTime(endTime);
+ XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+ }
+
+ public BundleJob.Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(BundleJob.Status bstatus) {
+ status = bstatus;
+ // set high-level status for event based on low-level actual job status
+ // this is to ease filtering on the consumer side
+ switch (status) {
+ case SUCCEEDED:
+ setEventStatus(EventStatus.SUCCESS);
+ break;
+ case RUNNING:
+ setEventStatus(EventStatus.STARTED);
+ break;
+ case SUSPENDED:
+ case SUSPENDEDWITHERROR:
+ setEventStatus(EventStatus.SUSPEND);
+ break;
+ case KILLED:
+ case FAILED:
+ case DONEWITHERROR:
+ setEventStatus(EventStatus.FAILURE);
+ }
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Coordinator Actions
+ */
+public class CoordinatorActionEvent extends JobEvent {
+
+ private CoordinatorAction.Status status;
+ private Date nominalTime;
+ private String missingDeps;
+ private String errorCode;
+ private String errorMessage;
+ // TODO more attributes - frequency, timeunit, bundleName
+ // for some advanced processing and linking using events
+
+ public CoordinatorActionEvent(String id, String parentId, CoordinatorAction.Status status, String user,
+ String appName, Date nomTime, Date startTime, String missDeps) {
+ super(AppType.COORDINATOR_ACTION, id, parentId, user, appName);
+ setStatus(status);
+ setNominalTime(nomTime);
+ setStartTime(startTime);
+ setMissingDeps(missDeps);
+ XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+ }
+
+ public String getBundleJobId() {
+ return null; // TODO extract prefix from bundleActionId before '@'
+ }
+
+ public CoordinatorAction.Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(CoordinatorAction.Status castatus) {
+ status = castatus;
+ // set high-level status for event based on low-level actual job status
+ // this is to ease filtering on the consumer side
+ switch (status) {
+ case WAITING:
+ setEventStatus(EventStatus.WAITING);
+ break;
+ case SUCCEEDED:
+ setEventStatus(EventStatus.SUCCESS);
+ setEndTime(new Date());
+ break;
+ case RUNNING:
+ setEventStatus(EventStatus.STARTED);
+ break;
+ case SUSPENDED:
+ setEventStatus(EventStatus.SUSPEND);
+ break;
+ case KILLED:
+ case FAILED:
+ case TIMEDOUT:
+ setEventStatus(EventStatus.FAILURE);
+ setEndTime(new Date());
+ }
+ }
+
+ public Date getNominalTime() {
+ return nominalTime;
+ }
+
+ public void setNominalTime(Date time) {
+ nominalTime = time;
+ }
+
+ public String getMissingDeps() {
+ return missingDeps;
+ }
+
+ public void setMissingDeps(String dependencies) {
+ missingDeps = dependencies;
+ }
+
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(String code) {
+ errorCode = code;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String msg) {
+ errorCode = msg;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Coordinator Jobs
+ */
+public class CoordinatorJobEvent extends JobEvent {
+
+ private CoordinatorJob.Status status;
+
+ public CoordinatorJobEvent(String id, String parentId, CoordinatorJob.Status status, String user, String appName,
+ Date startTime, Date endTime) {
+ super(AppType.COORDINATOR_JOB, id, parentId, user, appName);
+ setStatus(status);
+ setStartTime(startTime);
+ setEndTime(endTime);
+ XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+ }
+
+ public CoordinatorJob.Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(CoordinatorJob.Status coordStatus) {
+ status = coordStatus;
+ // set high-level status for event based on low-level actual job status
+ // this is to ease filtering on the consumer side
+ switch (status) {
+ case SUCCEEDED:
+ setEventStatus(EventStatus.SUCCESS);
+ break;
+ case RUNNING:
+ setEventStatus(EventStatus.STARTED);
+ break;
+ case SUSPENDED:
+ case PREPSUSPENDED:
+ case SUSPENDEDWITHERROR:
+ setEventStatus(EventStatus.SUSPEND);
+ break;
+ case KILLED:
+ case FAILED:
+ case DONEWITHERROR:
+ setEventStatus(EventStatus.FAILURE);
+ }
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.oozie.client.event.Event;
+
+/**
+ * Interface to define the queue operations for the events system
+ */
+public interface EventQueue {
+
+ public class EventQueueElement implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ Event event;
+
+ public EventQueueElement(Event e) {
+ event = e;
+ }
+ }
+
+ /**
+ * Initialize the event queue
+ * @param queueSize
+ * @param batchSize
+ */
+ public void init(int queueSize, int batchSize);
+
+ /**
+ * Add event to queue
+ * @param event
+ */
+ public void add(Event e);
+
+ /**
+ * Fetch events from queue in batch
+ * @return events set
+ */
+ public Set<Event> pollBatch();
+
+ /**
+ * Fetch single event from queue
+ * @return event
+ */
+ public Event poll();
+
+ /**
+ * Find out if queue is empty
+ * @return boolean
+ */
+ public boolean isEmpty();
+
+ /**
+ * Get current queue size
+ * @return size
+ */
+ public int getCurrentSize();
+
+ /**
+ * Read topmost event from queue but do not pop from it
+ * @return event
+ */
+ public Event peek();
+
+ /**
+ * Get the batch size used during polling events
+ * @return batchSize
+ */
+ public int getBatchSize();
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.util.XLog;
+
+/**
+ * An implementation of the EventQueue, defining a memory-based data structure
+ * holding the events
+ */
+public class MemoryEventQueue implements EventQueue {
+
+ private static ConcurrentLinkedQueue<EventQueueElement> eventQueue;
+ private static AtomicInteger currentSize;
+ private static int maxSize;
+ private static XLog LOG;
+ private static int batchSize;
+
+ @Override
+ public void init(int queueSize, int batchsize) {
+ eventQueue = new ConcurrentLinkedQueue<EventQueueElement>();
+ maxSize = queueSize;
+ currentSize = new AtomicInteger();
+ batchSize = batchsize;
+ LOG = XLog.getLog(getClass());
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void add(Event e) {
+ EventQueueElement eqe = new EventQueueElement(e);
+ try {
+ if(getCurrentSize() <= maxSize) {
+ if(eventQueue.add(eqe)) {
+ currentSize.incrementAndGet();
+ }
+ }
+ else {
+ LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", getCurrentSize(), e);
+ }
+ }
+ catch (IllegalStateException ise) {
+ LOG.warn("Unable to add event due to " + ise);
+ }
+ }
+
+ @Override
+ public Set<Event> pollBatch() {
+ // batch drain
+ Set<Event> eventBatch = new HashSet<Event>();
+ for (int i = 0; i < batchSize; i++) {
+ EventQueueElement polled = eventQueue.poll();
+ if (polled != null) {
+ currentSize.decrementAndGet();
+ eventBatch.add(polled.event);
+ }
+ else {
+ LOG.debug("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize);
+ break;
+ }
+ }
+ return eventBatch;
+ }
+
+ @Override
+ public Event poll() {
+ EventQueueElement polled = eventQueue.poll();
+ if (polled != null) {
+ currentSize.decrementAndGet();
+ return polled.event;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return getCurrentSize() == 0;
+ }
+
+ @Override
+ public int getCurrentSize() {
+ return currentSize.intValue();
+ }
+
+ @Override
+ public Event peek() {
+ EventQueueElement peeked = eventQueue.peek();
+ if (peeked != null) {
+ return peeked.event;
+ }
+ return null;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Workflow Actions
+ */
+public class WorkflowActionEvent extends JobEvent {
+
+ private WorkflowAction.Status status;
+ private String hadoopId;
+ private String errorCode;
+ private String errorMessage;
+
+ public WorkflowActionEvent(String id, String parentId, WorkflowAction.Status status, String user, String appName,
+ Date startTime, Date endTime) {
+ super(AppType.WORKFLOW_ACTION, id, parentId, user, appName);
+ setStatus(status);
+ setStartTime(startTime);
+ setEndTime(endTime);
+ XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+ }
+
+ public WorkflowAction.Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(WorkflowAction.Status actionStatus) {
+ status = actionStatus;
+ // set high-level status for event based on low-level actual job status
+ // this is to ease filtering on the consumer side
+ switch (actionStatus) {
+ case OK:
+ setEventStatus(EventStatus.SUCCESS);
+ break;
+ case RUNNING:
+ setEventStatus(EventStatus.STARTED);
+ break;
+ case ERROR:
+ case KILLED:
+ case FAILED:
+ setEventStatus(EventStatus.FAILURE);
+ case START_MANUAL:
+ case END_MANUAL:
+ setEventStatus(EventStatus.SUSPEND);
+ }
+ }
+
+ public String getHadoopId() {
+ return hadoopId;
+ }
+
+ public void setHadoopId(String id) {
+ hadoopId = id;
+ }
+
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(String code) {
+ errorCode = code;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String msg) {
+ errorCode = msg;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Workflow Jobs
+ */
+public class WorkflowJobEvent extends JobEvent {
+
+ private WorkflowJob.Status status;
+ private String errorCode;
+ private String errorMessage;
+ // TODO more attributes - run, coordName, bundleId
+ // for some advanced processing and linking using events
+
+ public WorkflowJobEvent(String id, String parentId, WorkflowJob.Status status, String user, String appName,
+ Date startTime, Date endTime) {
+ super(AppType.WORKFLOW_JOB, id, parentId, user, appName);
+ setStatus(status);
+ setStartTime(startTime);
+ setEndTime(endTime);
+ XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+ }
+
+ public String getCoordJobId() {
+ return null; // TODO extract prefix from coordActionId before '@'
+ }
+
+ public WorkflowJob.Status getStatus() {
+ return status;
+ }
+
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(String code) {
+ errorCode = code;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String msg) {
+ errorMessage = msg;
+ }
+
+ public void setStatus(WorkflowJob.Status wfstatus) {
+ status = wfstatus;
+ // set high-level status for event based on low-level actual job status
+ // this is to ease filtering on the consumer side
+ switch (status) {
+ case SUCCEEDED:
+ setEventStatus(EventStatus.SUCCESS);
+ break;
+ case RUNNING:
+ setEventStatus(EventStatus.STARTED);
+ break;
+ case SUSPENDED:
+ setEventStatus(EventStatus.SUSPEND);
+ break;
+ case KILLED:
+ case FAILED:
+ setEventStatus(EventStatus.FAILURE);
+ }
+ }
+
+}