You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/04/10 02:51:45 UTC

svn commit: r1466307 [1/2] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/event/ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/client/rest/ core/src/main/java/...

Author: mona
Date: Wed Apr 10 00:51:43 2013
New Revision: 1466307

URL: http://svn.apache.org/r1466307
Log:
OOZIE-1209 Event generation and handling for workflow and coordinator (mona)

Added:
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/
    oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/
    oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java
    oozie/trunk/core/src/test/java/org/apache/oozie/event/
    oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
    oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
Modified:
    oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
    oozie/trunk/release-log.txt

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/SLAEvent.java Wed Apr 10 00:51:43 2013
@@ -21,6 +21,9 @@ import java.util.Date;
 
 /**
  * Bean that represents a SLA event
+ * @deprecated This interface has been deprecated by the interface
+ * SLAEvent in oozie.client.event package which works with the
+ * new SLA Calculator system OOZIE-1244
  */
 public interface SLAEvent {
 

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event;
+
+/**
+ * This interface defines an Event that can be generated via
+ * Job status changes or SLA related events
+ */
+public interface Event {
+
+    /**
+     * Events will be messages, broadly of type - Job related or SLA related
+     *
+     */
+    public static enum MessageType {
+        JOB,
+        SLA
+    }
+
+    /**
+     * Events carry the associated app-type or job-type to enable toggling on/off
+     * events generated only for specific app-types or filtering on receiving side
+     */
+    public static enum AppType {
+        WORKFLOW_JOB,
+        WORKFLOW_ACTION,
+        COORDINATOR_JOB,
+        COORDINATOR_ACTION,
+        BUNDLE_JOB
+    }
+
+    /**
+     * Get the AppType of the event
+     * @return AppType
+     */
+    public AppType getAppType();
+
+    /**
+     * Get the MessageType of the event
+     * @return MessageType
+     */
+    public MessageType getMsgType();
+
+    /**
+     * Get the app-name of the job generating this event
+     * @return String app-name
+     */
+    public String getAppName();
+
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/JobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import org.apache.oozie.client.event.Event;
+
+/**
+ * An abstract implementation of the Event interface, related to
+ * notification events after job status changes
+ */
+public abstract class JobEvent implements Event, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Coarse-grained status of the job event
+     *
+     */
+    public static enum EventStatus {
+        WAITING, STARTED, SUCCESS, SUSPEND, FAILURE
+    }
+
+    private AppType appType;
+    private MessageType msgType;
+    private String id;
+    private String parentId;
+    private EventStatus eventStatus;
+    private String appName;
+    private String user;
+    private Date startTime;
+    private Date endTime;
+
+    public JobEvent(AppType appType, String id, String parentId, String user, String appName) {
+        this.appType = appType;
+        this.msgType = MessageType.JOB;
+        this.id = id;
+        this.parentId = parentId;
+        this.user = user;
+        this.appName = appName;
+    }
+
+    @Override
+    public AppType getAppType() {
+        return appType;
+    }
+
+    public void setAppType(AppType type) {
+        appType = type;
+    }
+
+    @Override
+    public MessageType getMsgType() {
+        return msgType;
+    }
+
+    public void setMsgType(MessageType type) {
+        msgType = type;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getParentId() {
+        return parentId;
+    }
+
+    public void setParentId(String id) {
+        parentId = id;
+    }
+
+    public EventStatus getEventStatus() {
+        return eventStatus;
+    }
+
+    public void setEventStatus(EventStatus eStatus) {
+        eventStatus = eStatus;
+    }
+
+    @Override
+    public String getAppName() {
+        return appName;
+    }
+
+    public void setAppName(String name) {
+        appName = name;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String euser) {
+        user = euser;
+    }
+
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Date time) {
+        startTime = time;
+    }
+
+    public Date getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Date time) {
+        endTime = time;
+    }
+
+    @Override
+    public String toString() {
+        return "ID: " + getId() + ", AppType: " + getAppType() + ", Appname: " + getAppName() + ", Status: "
+                + getEventStatus();
+    }
+
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event;
+
+import java.util.Date;
+
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+
+/**
+ * A sub-class of the Event interface, related to
+ * notification events after SLA mets/misses
+ */
+public interface SLAEvent extends Event {
+
+    public static enum EventType {
+        START_MET,
+        START_MISS,
+        END_MET,
+        END_MISS,
+        DURATION_MET,
+        DURATION_MISS
+    }
+
+    /**
+     * Get the SLA event-type
+     * @return SLAEvent.EventType
+     */
+    public EventType getEventType();
+
+    /**
+     * Get the job id
+     * @return String id
+     */
+    public String getId();
+
+    /**
+     * Get the parent job id
+     * @return String id
+     */
+    public String getParentId();
+
+    /**
+     * Get the actual job status
+     * @return EventStatus status
+     */
+    public EventStatus getJobStatus();
+
+    /**
+     * Get the job-id
+     * @return String job-id
+     */
+    public String getJobId();
+
+    /**
+     * Get the sla event sequence-id
+     * @return String sequence-id
+     */
+    public String getSequenceId();
+
+    /**
+     * Get the expected start-time for this job
+     * @return Date expected start-time
+     */
+    public Date getExpectedStart();
+
+    /**
+     * Get the actual start-time for this job
+     * @return Date actual start-time
+     */
+    public Date getActualStart();
+
+    /**
+     * Get the expected end-time for this job
+     * @return Date expected end-time
+     */
+    public Date getExpectedEnd();
+
+    /**
+     * Get the actual end-time for this job
+     * @return Date actual end-time
+     */
+    public Date getActualEnd();
+
+    /**
+     * Get the expected duration for this job
+     * @return Date expected duration
+     */
+    public long getExpectedDuration();
+
+    /**
+     * Get the actual duration for this job
+     * @return Date actual duration
+     */
+    public long getActualDuration();
+
+    /**
+     * Get the sla notification-message
+     * @return String notification-message
+     */
+    public String getNotificationMsg();
+
+    /**
+     * Get the SLA alert-contact
+     * @return String alert-contact
+     */
+    public String getAlertContact();
+
+    /**
+     * Get the SLA dev-contact
+     * @return String dev-contact
+     */
+    public String getDevContact();
+
+    /**
+     * Get the SLA qa-contact
+     * @return String qa-contact
+     */
+    public String getQaContact();
+
+    /**
+     * Get the SLA alert-contact
+     * @return String alert-contact
+     */
+    public String getSeContact();
+
+    /**
+     * Get the SLA alert-frequency
+     * @return String alert-frequency
+     */
+    public String getAlertFrequency();
+
+    /**
+     * Get the SLA alert-percentage
+     * @return String alert-percentage
+     */
+    public String getAlertPercentage();
+
+    /**
+     * Get the dependent upstream apps
+     * @return String upstream-apps
+     */
+    public String getUpstreamApps();
+
+    /**
+     * Get job related data or configuration
+     * @return String job-data
+     */
+    public String getJobData();
+
+    /**
+     * Get last modified time of this sla event
+     * @return Date last-modified-time
+     */
+    public Date getLastModified();
+
+    /**
+     * Get the user for this job sla
+     * @return String user
+     */
+    public String getUser();
+
+    /**
+     * Get the nominal time for this job under sla
+     * @return Date nominalTime
+     */
+    public Date getNominalTime();
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Wed Apr 10 00:51:43 2013
@@ -77,19 +77,19 @@ import org.apache.openjpa.persistence.jd
         // Select query used by ActionInfo command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
         // Select Query used by Timeout command
-        @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending from CoordinatorActionBean a where a.id = :id"),
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
         // Select query used by InputCheck command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
         // Select query used by CoordActionUpdate command
-        @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.externalId = :externalId"),
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
         // Select query used by Check command
-        @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.id = :id"),
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
         // Select query used by Start command
-        @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode from CoordinatorActionBean a where a.id = :id"),
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
 
-        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
 
-        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
 
         @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"),
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java Wed Apr 10 00:51:43 2013
@@ -73,7 +73,9 @@ import org.apache.openjpa.persistence.jd
 
         @NamedQuery(name = "GET_COORD_JOBS_WITH_PARENT_ID", query = "select w.id from CoordinatorJobBean w where w.bundleId = :parentId"),
 
-        @NamedQuery(name = "GET_COORD_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from CoordinatorJobBean w where w.bundleId = :parentId and (w.status NOT IN ('SUCCEEDED', 'FAILED', 'KILLED', 'DONEWITHERROR') OR w.lastModifiedTimestamp >= :lastModTime)")
+        @NamedQuery(name = "GET_COORD_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from CoordinatorJobBean w where w.bundleId = :parentId and (w.status NOT IN ('SUCCEEDED', 'FAILED', 'KILLED', 'DONEWITHERROR') OR w.lastModifiedTimestamp >= :lastModTime)"),
+
+        @NamedQuery(name = "GET_COORD_JOB_FOR_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id")
 })
 public class CoordinatorJobBean extends JsonCoordinatorJob implements Writable {
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java Wed Apr 10 00:51:43 2013
@@ -300,7 +300,7 @@ public class JsonCoordinatorAction imple
 
     @Override
     public String toString() {
-        return MessageFormat.format("WorkflowAction name[{0}] status[{1}]",
+        return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]",
                                     getId(), getStatus());
     }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java Wed Apr 10 00:51:43 2013
@@ -21,6 +21,7 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.FaultInjection;
 import org.apache.oozie.XException;
 import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.MemoryLocksService;
 import org.apache.oozie.service.Services;
@@ -77,6 +78,7 @@ public abstract class XCommand<T> implem
     protected Instrumentation instrumentation;
 
     protected XLog.Info logInfo;
+    protected static EventHandlerService eventService;
 
     /**
      * Create a command.
@@ -93,6 +95,7 @@ public abstract class XCommand<T> implem
         createdTime = System.currentTimeMillis();
         logInfo = new XLog.Info();
         instrumentation = Services.get().get(InstrumentationService.class).get();
+        eventService = Services.get().get(EventHandlerService.class);
     }
 
     /**

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Wed Apr 10 00:51:43 2013
@@ -23,10 +23,12 @@ import java.util.Date;
 import java.util.List;
 
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -42,6 +44,7 @@ import org.apache.oozie.command.CommandE
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 
 /**
@@ -69,7 +72,6 @@ public class CoordActionCheckXCommand ex
         try {
             InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
             WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetJPAExecutor(coordAction.getExternalId()));
-
             Status slaStatus = null;
 
             if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) {
@@ -116,6 +118,11 @@ public class CoordActionCheckXCommand ex
             }
 
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
+            if (EventHandlerService.isEventsConfigured()) {
+                CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+                        coordAction.getJobId()));
+                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+            }
         }
         catch (XException ex) {
             LOG.warn("CoordActionCheckCommand Failed ", ex);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Wed Apr 10 00:51:43 2013
@@ -43,6 +43,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
@@ -164,7 +165,7 @@ public class CoordActionInputCheckXComma
             }
             else {
                 if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
-                    queue(new CoordActionTimeOutXCommand(coordAction));
+                    queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                 }
                 else {
                     // Let CoordPushDependencyCheckXCommand queue the timeout
@@ -176,7 +177,8 @@ public class CoordActionInputCheckXComma
             if (isTimeout(currentTime)) {
                 LOG.debug("Queueing timeout command");
                 // XCommand.queue() will not work when there is a Exception
-                Services.get().get(CallableQueueService.class).queue(new CoordActionTimeOutXCommand(coordAction));
+                Services.get().get(CallableQueueService.class)
+                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
             }
             throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
         }
@@ -210,6 +212,11 @@ public class CoordActionInputCheckXComma
             try {
                 if (isChangeInDependency) {
                     jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
+                    if (EventHandlerService.isEventsConfigured()
+                            && coordAction.getStatus() != CoordinatorAction.Status.READY) {
+                        //since event is not to be generated unless action RUNNING via StartX
+                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+                    }
                 }
                 else {
                     jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java Wed Apr 10 00:51:43 2013
@@ -109,7 +109,8 @@ public class CoordActionReadyXCommand ex
                 // change state of action to SUBMITTED
                 action.setStatus(CoordinatorAction.Status.SUBMITTED);
                 // queue action to start action
-                queue(new CoordActionStartXCommand(action.getId(), user, authToken, action.getJobId()), 100);
+                queue(new CoordActionStartXCommand(action.getId(), user, coordJob.getAppName(), authToken,
+                        action.getJobId()), 100);
                 try {
                     jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(action));
                 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Wed Apr 10 00:51:43 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.service.DagEngineService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.JobUtils;
@@ -43,6 +44,7 @@ import org.apache.oozie.client.SLAEvent.
 import org.apache.oozie.client.SLAEvent.Status;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.jdom.Element;
@@ -65,6 +67,7 @@ public class CoordActionStartXCommand ex
     private final XLog log = getLog();
     private String actionId = null;
     private String user = null;
+    private String appName = null;
     private String authToken = null;
     private CoordinatorActionBean coordAction = null;
     private JPAService jpaService = null;
@@ -72,11 +75,12 @@ public class CoordActionStartXCommand ex
     private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
 
-    public CoordActionStartXCommand(String id, String user, String token, String jobId) {
+    public CoordActionStartXCommand(String id, String user, String appName, String token, String jobId) {
         //super("coord_action_start", "coord_action_start", 1, XLog.OPS);
         super("coord_action_start", "coord_action_start", 1);
         this.actionId = ParamChecker.notEmpty(id, "id");
         this.user = ParamChecker.notEmpty(user, "user");
+        this.appName = ParamChecker.notEmpty(appName, "appName");
         this.authToken = ParamChecker.notEmpty(token, "token");
         this.jobId = jobId;
     }
@@ -190,6 +194,9 @@ public class CoordActionStartXCommand ex
                     updateList.add(coordAction);
                     try {
                         jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
+                        if (EventHandlerService.isEventsConfigured()) {
+                            generateEvent(coordAction, user, appName);
+                        }
                     }
                     catch (JPAExecutorException je) {
                         throw new CommandException(je);
@@ -243,6 +250,9 @@ public class CoordActionStartXCommand ex
                     try {
                         // call JPAExecutor to do the bulk writes
                         jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
+                        if (EventHandlerService.isEventsConfigured()) {
+                            generateEvent(coordAction, user, appName);
+                        }
                     }
                     catch (JPAExecutorException je) {
                         throw new CommandException(je);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java Wed Apr 10 00:51:43 2013
@@ -27,6 +27,7 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.executor.jpa.CoordActionGetForTimeoutJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.LogUtils;
@@ -37,12 +38,15 @@ import org.apache.oozie.util.ParamChecke
  */
 public class CoordActionTimeOutXCommand extends CoordinatorXCommand<Void> {
     private CoordinatorActionBean actionBean;
+    private String user;
+    private String appName;
     private JPAService jpaService = null;
 
-    public CoordActionTimeOutXCommand(CoordinatorActionBean actionBean) {
+    public CoordActionTimeOutXCommand(CoordinatorActionBean actionBean, String user, String appName) {
         super("coord_action_timeout", "coord_action_timeout", 1);
-        ParamChecker.notNull(actionBean, "ActionBean");
-        this.actionBean = actionBean;
+        this.actionBean = ParamChecker.notNull(actionBean, "ActionBean");
+        this.user = ParamChecker.notEmpty(user, "user");
+        this.appName = ParamChecker.notEmpty(appName, "appName");
     }
 
     /* (non-Javadoc)
@@ -52,10 +56,13 @@ public class CoordActionTimeOutXCommand 
     protected Void execute() throws CommandException {
         if (actionBean.getStatus() == CoordinatorAction.Status.WAITING) {
             actionBean.setStatus(CoordinatorAction.Status.TIMEDOUT);
-            queue(new CoordActionNotificationXCommand(actionBean), 100);
-            actionBean.setLastModifiedTime(new Date());
             try {
+                queue(new CoordActionNotificationXCommand(actionBean), 100);
+                actionBean.setLastModifiedTime(new Date());
                 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(actionBean));
+                if (EventHandlerService.isEventsConfigured()) {
+                    generateEvent(actionBean, user, appName);
+                }
             }
             catch (JPAExecutorException e) {
                 throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Wed Apr 10 00:51:43 2013
@@ -22,10 +22,12 @@ import java.util.Date;
 import java.util.List;
 
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.LogUtils;
@@ -39,6 +41,7 @@ import org.apache.oozie.command.CommandE
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 
 public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
@@ -64,7 +67,6 @@ public class CoordActionUpdateXCommand e
     protected Void execute() throws CommandException {
         try {
             LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
-
             Status slaStatus = null;
             CoordinatorAction.Status preCoordStatus = coordAction.getStatus();
             if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
@@ -131,6 +133,11 @@ public class CoordActionUpdateXCommand e
             }
 
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
+            if (EventHandlerService.isEventsConfigured()) {
+                CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(coordAction
+                        .getJobId()));
+                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+            }
 
             LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Wed Apr 10 00:51:43 2013
@@ -42,6 +42,7 @@ import org.apache.oozie.executor.jpa.Bul
 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
@@ -111,6 +112,9 @@ public class CoordMaterializeTransitionX
             for (JsonBean actionBean : insertList) {
                 if (actionBean instanceof CoordinatorActionBean) {
                     CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
+                    if (EventHandlerService.isEventsConfigured()) {
+                        CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+                    }
                     if (coordAction.getPushMissingDependencies() != null) {
                         // TODO: Delay in catchup mode?
                         queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
@@ -259,7 +263,7 @@ public class CoordMaterializeTransitionX
             coordJob.resetPending();
         }
         catch (Exception e) {
-            LOG.error("Excepion thrown :", e);
+            LOG.error("Exception thrown :", e);
             throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
         }
         cron.stop();

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Wed Apr 10 00:51:43 2013
@@ -42,6 +42,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.RecoveryService;
 import org.apache.oozie.service.Service;
@@ -140,7 +141,7 @@ public class CoordPushDependencyCheckXCo
                     // Checking for timeout
                     timeout = isTimeout();
                     if (timeout) {
-                        queue(new CoordActionTimeOutXCommand(coordAction));
+                        queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                     }
                     else {
                         queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
@@ -164,7 +165,7 @@ public class CoordPushDependencyCheckXCo
                 if (isTimeout()) {
                     LOG.debug("Queueing timeout command");
                     // XCommand.queue() will not work when there is a Exception
-                    callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction));
+                    callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                     unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
                 }
                 else if (coordAction.getMissingDependencies() != null
@@ -252,6 +253,11 @@ public class CoordPushDependencyCheckXCo
             try {
                 if (isChangeInDependency) {
                     jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+                    if (EventHandlerService.isEventsConfigured()
+                            && coordAction.getStatus() != CoordinatorAction.Status.READY) {
+                        //since event is not to be generated unless action RUNNING via StartX
+                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+                    }
                 }
                 else {
                     jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Wed Apr 10 00:51:43 2013
@@ -35,6 +35,7 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
@@ -45,6 +46,7 @@ import org.apache.oozie.coord.CoordUtils
 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -404,6 +406,14 @@ public class CoordRerunXCommand extends 
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+            if (EventHandlerService.isEventsConfigured()) {
+                for (JsonBean bean : updateList) {
+                    if (bean instanceof CoordinatorActionBean) {
+                        CoordinatorXCommand.generateEvent((CoordinatorActionBean) bean, coordJob.getUser(),
+                                coordJob.getAppName());
+                    }
+                }
+            }
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java Wed Apr 10 00:51:43 2013
@@ -17,7 +17,14 @@
  */
 package org.apache.oozie.command.coord;
 
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.event.Event.AppType;
 import org.apache.oozie.command.XCommand;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.util.ParamChecker;
 
 /**
  * Abstract coordinator command class derived from XCommand
@@ -47,4 +54,38 @@ public abstract class CoordinatorXComman
         super(name, type, priority, dryrun);
     }
 
+    protected static void generateEvent(CoordinatorActionBean coordAction, String user, String appName) {
+        if (eventService.checkSupportedApptype(AppType.COORDINATOR_ACTION.name())) {
+            ParamChecker.notNull(coordAction, "coordAction");
+            ParamChecker.notNull(user, "user");
+            ParamChecker.notNull(appName, "appName");
+            String missDep = coordAction.getMissingDependencies();
+            if (missDep != null && missDep.length() > 0) {
+                missDep = missDep.split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
+            }
+            String pushMissDep = coordAction.getPushMissingDependencies();
+            if (pushMissDep != null && pushMissDep.length() > 0) {
+                pushMissDep = pushMissDep.split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
+            }
+            String deps = missDep == null ? (pushMissDep == null ? null : pushMissDep) : (pushMissDep == null ? missDep
+                    : missDep + CoordELFunctions.INSTANCE_SEPARATOR + pushMissDep);
+            CoordinatorActionEvent event = new CoordinatorActionEvent(coordAction.getId(), coordAction.getJobId(),
+                    coordAction.getStatus(), user, appName, coordAction.getNominalTime(), coordAction.getCreatedTime(),
+                    deps);
+            event.setErrorCode(coordAction.getErrorCode());
+            event.setErrorMessage(coordAction.getErrorMessage());
+            eventService.queueEvent(event);
+        }
+    }
+
+    protected void generateEvent(CoordinatorJobBean coordJob) {
+        if (eventService.checkSupportedApptype(AppType.COORDINATOR_JOB.name())) {
+            ParamChecker.notNull(coordJob, "coordJob");
+            CoordinatorJobEvent event = new CoordinatorJobEvent(coordJob.getId(), coordJob.getBundleId(),
+                    coordJob.getStatus(), coordJob.getUser(), coordJob.getAppName(), coordJob.getStartTime(),
+                    coordJob.getEndTime());
+            eventService.queueEvent(event);
+        }
+    }
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Apr 10 00:51:43 2013
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Bul
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.workflow.WorkflowException;
@@ -155,6 +156,9 @@ public class KillXCommand extends Workfl
             wfJob.setLastModifiedTime(new Date());
             updateList.add(wfJob);
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+            if (EventHandlerService.isEventsConfigured()) {
+                generateEvent(wfJob, null, null);
+            }
             queue(new NotificationXCommand(wfJob));
         }
         catch (JPAExecutorException e) {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java Wed Apr 10 00:51:43 2013
@@ -41,6 +41,7 @@ import org.apache.oozie.executor.jpa.Bul
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -74,7 +75,6 @@ public class ResumeXCommand extends Work
                 workflow.setWorkflowInstance(wfInstance);
                 workflow.setStatus(WorkflowJob.Status.RUNNING);
 
-
                 //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) {
                 for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) {
 
@@ -130,6 +130,9 @@ public class ResumeXCommand extends Work
                 workflow.setLastModifiedTime(new Date());
                 updateList.add(workflow);
                 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+                if (EventHandlerService.isEventsConfigured()) {
+                    generateEvent(workflow);
+                }
                 queue(new NotificationXCommand(workflow));
             }
             return null;

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Apr 10 00:51:43 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.executor.jpa.JPA
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
@@ -311,6 +312,14 @@ public class SignalXCommand extends Work
             updateList.add(wfJob);
             // call JPAExecutor to do the bulk writes
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+            if (EventHandlerService.isEventsConfigured()) {
+                if (wfAction != null) {
+                    generateEvent(wfJob, wfAction.getErrorCode(), wfAction.getErrorMessage());
+                }
+                else {
+                    generateEvent(wfJob, null, null);
+                }
+            }
         }
         catch (JPAExecutorException je) {
             throw new CommandException(je);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java Wed Apr 10 00:51:43 2013
@@ -33,6 +33,7 @@ import org.apache.oozie.executor.jpa.Bul
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -100,6 +101,9 @@ public class SuspendXCommand extends Wor
             workflow.setWorkflowInstance(wfInstance);
 
             setPendingFalseForActions(jpaService, id, actionId, updateList);
+            if (EventHandlerService.isEventsConfigured()) {
+                generateEvent(workflow);
+            }
         }
     }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java Wed Apr 10 00:51:43 2013
@@ -17,7 +17,13 @@
  */
 package org.apache.oozie.command.wf;
 
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.event.Event.AppType;
 import org.apache.oozie.command.XCommand;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.util.ParamChecker;
 
 /**
  * Abstract coordinator command class derived from XCommand
@@ -48,4 +54,30 @@ public abstract class WorkflowXCommand<T
         super(name, type, priority, dryrun);
     }
 
+    protected static void generateEvent(WorkflowJobBean wfJob, String errorCode, String errorMsg) {
+        if (eventService.checkSupportedApptype(AppType.WORKFLOW_JOB.name())) {
+            ParamChecker.notNull(wfJob, "wfJob");
+            WorkflowJobEvent event = new WorkflowJobEvent(wfJob.getId(), wfJob.getParentId(), wfJob.getStatus(),
+                    wfJob.getUser(), wfJob.getAppName(), wfJob.getStartTime(), wfJob.getEndTime());
+            event.setErrorCode(errorCode);
+            event.setErrorMessage(errorMsg);
+            eventService.queueEvent(event);
+        }
+    }
+
+    protected static void generateEvent(WorkflowJobBean wfJob) {
+        generateEvent(wfJob, null, null);
+    }
+
+    protected void generateEvent(WorkflowActionBean wfAction, String wfUser) {
+        if (eventService.checkSupportedApptype(AppType.WORKFLOW_ACTION.name())) {
+            ParamChecker.notNull(wfAction, "wfAction");
+            WorkflowActionEvent event = new WorkflowActionEvent(wfAction.getId(), wfAction.getJobId(),
+                    wfAction.getStatus(), wfUser, wfAction.getName(), wfAction.getStartTime(), wfAction.getEndTime());
+            event.setErrorCode(wfAction.getErrorCode());
+            event.setErrorMessage(wfAction.getErrorMessage());
+            eventService.queueEvent(event);
+        }
+    }
+
 }

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Bundle Jobs
+ *
+ */
+public class BundleJobEvent extends JobEvent {
+
+    private BundleJob.Status status;
+
+    public BundleJobEvent(String id, BundleJob.Status status, String user, String appName, Date startTime, Date endTime) {
+        super(AppType.BUNDLE_JOB, id, null, user, appName); //parentId is null
+        setStatus(status);
+        setStartTime(startTime);
+        setEndTime(endTime);
+        XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+    }
+
+    public BundleJob.Status getStatus() {
+        return status;
+    }
+
+    public void setStatus(BundleJob.Status bstatus) {
+        status = bstatus;
+        // set high-level status for event based on low-level actual job status
+        // this is to ease filtering on the consumer side
+        switch (status) {
+            case SUCCEEDED:
+                setEventStatus(EventStatus.SUCCESS);
+                break;
+            case RUNNING:
+                setEventStatus(EventStatus.STARTED);
+                break;
+            case SUSPENDED:
+            case SUSPENDEDWITHERROR:
+                setEventStatus(EventStatus.SUSPEND);
+                break;
+            case KILLED:
+            case FAILED:
+            case DONEWITHERROR:
+                setEventStatus(EventStatus.FAILURE);
+        }
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Coordinator Actions
+ */
+public class CoordinatorActionEvent extends JobEvent {
+
+    private CoordinatorAction.Status status;
+    private Date nominalTime;
+    private String missingDeps;
+    private String errorCode;
+    private String errorMessage;
+    // TODO more attributes - frequency, timeunit, bundleName
+    // for some advanced processing and linking using events
+
+    public CoordinatorActionEvent(String id, String parentId, CoordinatorAction.Status status, String user,
+            String appName, Date nomTime, Date startTime, String missDeps) {
+        super(AppType.COORDINATOR_ACTION, id, parentId, user, appName);
+        setStatus(status);
+        setNominalTime(nomTime);
+        setStartTime(startTime);
+        setMissingDeps(missDeps);
+        XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+    }
+
+    public String getBundleJobId() {
+        return null; // TODO extract prefix from bundleActionId before '@'
+    }
+
+    public CoordinatorAction.Status getStatus() {
+        return status;
+    }
+
+    public void setStatus(CoordinatorAction.Status castatus) {
+        status = castatus;
+        // set high-level status for event based on low-level actual job status
+        // this is to ease filtering on the consumer side
+        switch (status) {
+            case WAITING:
+                setEventStatus(EventStatus.WAITING);
+                break;
+            case SUCCEEDED:
+                setEventStatus(EventStatus.SUCCESS);
+                setEndTime(new Date());
+                break;
+            case RUNNING:
+                setEventStatus(EventStatus.STARTED);
+                break;
+            case SUSPENDED:
+                setEventStatus(EventStatus.SUSPEND);
+                break;
+            case KILLED:
+            case FAILED:
+            case TIMEDOUT:
+                setEventStatus(EventStatus.FAILURE);
+                setEndTime(new Date());
+        }
+    }
+
+    public Date getNominalTime() {
+        return nominalTime;
+    }
+
+    public void setNominalTime(Date time) {
+        nominalTime = time;
+    }
+
+    public String getMissingDeps() {
+        return missingDeps;
+    }
+
+    public void setMissingDeps(String dependencies) {
+        missingDeps = dependencies;
+    }
+
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(String code) {
+        errorCode = code;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String msg) {
+        errorCode = msg;
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Coordinator Jobs
+ */
+public class CoordinatorJobEvent extends JobEvent {
+
+    private CoordinatorJob.Status status;
+
+    public CoordinatorJobEvent(String id, String parentId, CoordinatorJob.Status status, String user, String appName,
+            Date startTime, Date endTime) {
+        super(AppType.COORDINATOR_JOB, id, parentId, user, appName);
+        setStatus(status);
+        setStartTime(startTime);
+        setEndTime(endTime);
+        XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+    }
+
+    public CoordinatorJob.Status getStatus() {
+        return status;
+    }
+
+    public void setStatus(CoordinatorJob.Status coordStatus) {
+        status = coordStatus;
+        // set high-level status for event based on low-level actual job status
+        // this is to ease filtering on the consumer side
+        switch (status) {
+            case SUCCEEDED:
+                setEventStatus(EventStatus.SUCCESS);
+                break;
+            case RUNNING:
+                setEventStatus(EventStatus.STARTED);
+                break;
+            case SUSPENDED:
+            case PREPSUSPENDED:
+            case SUSPENDEDWITHERROR:
+                setEventStatus(EventStatus.SUSPEND);
+                break;
+            case KILLED:
+            case FAILED:
+            case DONEWITHERROR:
+                setEventStatus(EventStatus.FAILURE);
+        }
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.oozie.client.event.Event;
+
+/**
+ * Interface to define the queue operations for the events system
+ */
+public interface EventQueue {
+
+    public class EventQueueElement implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+        Event event;
+
+        public EventQueueElement(Event e) {
+            event = e;
+        }
+    }
+
+    /**
+     * Initialize the event queue
+     * @param queueSize
+     * @param batchSize
+     */
+    public void init(int queueSize, int batchSize);
+
+    /**
+     * Add event to queue
+     * @param event
+     */
+    public void add(Event e);
+
+    /**
+     * Fetch events from queue in batch
+     * @return events set
+     */
+    public Set<Event> pollBatch();
+
+    /**
+    * Fetch single event from queue
+    * @return event
+    */
+   public Event poll();
+
+    /**
+     * Find out if queue is empty
+     * @return boolean
+     */
+    public boolean isEmpty();
+
+    /**
+     * Get current queue size
+     * @return size
+     */
+    public int getCurrentSize();
+
+    /**
+     * Read topmost event from queue but do not pop from it
+     * @return event
+     */
+    public Event peek();
+
+    /**
+     * Get the batch size used during polling events
+     * @return batchSize
+     */
+    public int getBatchSize();
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.util.XLog;
+
+/**
+ * An implementation of the EventQueue, defining a memory-based data structure
+ * holding the events
+ */
+public class MemoryEventQueue implements EventQueue {
+
+    private static ConcurrentLinkedQueue<EventQueueElement> eventQueue;
+    private static AtomicInteger currentSize;
+    private static int maxSize;
+    private static XLog LOG;
+    private static int batchSize;
+
+    @Override
+    public void init(int queueSize, int batchsize) {
+        eventQueue = new ConcurrentLinkedQueue<EventQueueElement>();
+        maxSize = queueSize;
+        currentSize = new AtomicInteger();
+        batchSize = batchsize;
+        LOG = XLog.getLog(getClass());
+    }
+
+    @Override
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    @Override
+    public void add(Event e) {
+        EventQueueElement eqe = new EventQueueElement(e);
+        try {
+            if(getCurrentSize() <= maxSize) {
+                if(eventQueue.add(eqe)) {
+                    currentSize.incrementAndGet();
+                }
+            }
+            else {
+                LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", getCurrentSize(), e);
+            }
+        }
+        catch (IllegalStateException ise) {
+            LOG.warn("Unable to add event due to " + ise);
+        }
+    }
+
+    @Override
+    public Set<Event> pollBatch() {
+        // batch drain
+        Set<Event> eventBatch = new HashSet<Event>();
+        for (int i = 0; i < batchSize; i++) {
+            EventQueueElement polled = eventQueue.poll();
+            if (polled != null) {
+                currentSize.decrementAndGet();
+                eventBatch.add(polled.event);
+            }
+            else {
+                LOG.debug("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize);
+                break;
+            }
+        }
+        return eventBatch;
+    }
+
+    @Override
+    public Event poll() {
+        EventQueueElement polled = eventQueue.poll();
+        if (polled != null) {
+            currentSize.decrementAndGet();
+            return polled.event;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return getCurrentSize() == 0;
+    }
+
+    @Override
+    public int getCurrentSize() {
+        return currentSize.intValue();
+    }
+
+    @Override
+    public Event peek() {
+        EventQueueElement peeked = eventQueue.peek();
+        if (peeked != null) {
+            return peeked.event;
+        }
+        return null;
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Workflow Actions
+ */
+public class WorkflowActionEvent extends JobEvent {
+
+    private WorkflowAction.Status status;
+    private String hadoopId;
+    private String errorCode;
+    private String errorMessage;
+
+    public WorkflowActionEvent(String id, String parentId, WorkflowAction.Status status, String user, String appName,
+            Date startTime, Date endTime) {
+        super(AppType.WORKFLOW_ACTION, id, parentId, user, appName);
+        setStatus(status);
+        setStartTime(startTime);
+        setEndTime(endTime);
+        XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+    }
+
+    public WorkflowAction.Status getStatus() {
+        return status;
+    }
+
+    public void setStatus(WorkflowAction.Status actionStatus) {
+        status = actionStatus;
+        // set high-level status for event based on low-level actual job status
+        // this is to ease filtering on the consumer side
+        switch (actionStatus) {
+            case OK:
+                setEventStatus(EventStatus.SUCCESS);
+                break;
+            case RUNNING:
+                setEventStatus(EventStatus.STARTED);
+                break;
+            case ERROR:
+            case KILLED:
+            case FAILED:
+                setEventStatus(EventStatus.FAILURE);
+            case START_MANUAL:
+            case END_MANUAL:
+                setEventStatus(EventStatus.SUSPEND);
+        }
+    }
+
+    public String getHadoopId() {
+        return hadoopId;
+    }
+
+    public void setHadoopId(String id) {
+        hadoopId = id;
+    }
+
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(String code) {
+        errorCode = code;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String msg) {
+        errorCode = msg;
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.event;
+
+import java.util.Date;
+
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Class implementing JobEvent for events generated by Workflow Jobs
+ */
+public class WorkflowJobEvent extends JobEvent {
+
+    private WorkflowJob.Status status;
+    private String errorCode;
+    private String errorMessage;
+    // TODO more attributes - run, coordName, bundleId
+    // for some advanced processing and linking using events
+
+    public WorkflowJobEvent(String id, String parentId, WorkflowJob.Status status, String user, String appName,
+            Date startTime, Date endTime) {
+        super(AppType.WORKFLOW_JOB, id, parentId, user, appName);
+        setStatus(status);
+        setStartTime(startTime);
+        setEndTime(endTime);
+        XLog.getLog(EventHandlerService.class).debug("Event generated - " + this.toString());
+    }
+
+    public String getCoordJobId() {
+        return null; // TODO extract prefix from coordActionId before '@'
+    }
+
+    public WorkflowJob.Status getStatus() {
+        return status;
+    }
+
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(String code) {
+        errorCode = code;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String msg) {
+        errorMessage = msg;
+    }
+
+    public void setStatus(WorkflowJob.Status wfstatus) {
+        status = wfstatus;
+        // set high-level status for event based on low-level actual job status
+        // this is to ease filtering on the consumer side
+        switch (status) {
+            case SUCCEEDED:
+                setEventStatus(EventStatus.SUCCESS);
+                break;
+            case RUNNING:
+                setEventStatus(EventStatus.STARTED);
+                break;
+            case SUSPENDED:
+                setEventStatus(EventStatus.SUSPEND);
+                break;
+            case KILLED:
+            case FAILED:
+                setEventStatus(EventStatus.FAILURE);
+        }
+    }
+
+}