You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/04/10 02:51:45 UTC

svn commit: r1466307 [2/2] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/event/ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/client/rest/ core/src/main/java/...

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.listener;
+
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+
+/**
+ * Event listener for Job notification events, defining methods corresponding to
+ * job status changes
+ */
+public abstract class JobEventListener {
+
+    /**
+     * Initialize the listener
+     */
+    public abstract void init();
+
+    /**
+     * Destroy the listener
+     */
+    public abstract void destroy();
+
+    /**
+     * On workflow job transition to start state
+     * @param WorkflowJobEvent
+     */
+    public abstract void onWorkflowJobStart(WorkflowJobEvent wje);
+
+    /**
+     * On workflow job transition to success state
+     * @param WorkflowJobEvent
+     */
+    public abstract void onWorkflowJobSuccess(WorkflowJobEvent wje);
+
+    /**
+     * On workflow job transition to failure state
+     * @param WorkflowJobEvent
+     */
+    public abstract void onWorkflowJobFailure(WorkflowJobEvent wje);
+
+    /**
+     * On workflow job transition to suspend state
+     * @param WorkflowJobEvent
+     */
+    public abstract void onWorkflowJobSuspend(WorkflowJobEvent wje);
+
+    /**
+     * On workflow action transition to start state
+     * @param WorkflowActionEvent
+     */
+    public abstract void onWorkflowActionStart(WorkflowActionEvent wae);
+
+    /**
+     * On workflow action transition to success state
+     * @param WorkflowActionEvent
+     */
+    public abstract void onWorkflowActionSuccess(WorkflowActionEvent wae);
+
+    /**
+     * On workflow action transition to failure state
+     * @param WorkflowActionEvent
+     */
+    public abstract void onWorkflowActionFailure(WorkflowActionEvent wae);
+
+    /**
+     * On workflow action transition to suspend state
+     * @param WorkflowActionEvent
+     */
+    public abstract void onWorkflowActionSuspend(WorkflowActionEvent wae);
+
+    /**
+     * On coord job transition to start state
+     * @param CoordinatorJobEvent
+     */
+    public abstract void onCoordinatorJobStart(CoordinatorJobEvent wje);
+
+    /**
+     * On coord job transition to success state
+     * @param CoordinatorJobEvent
+     */
+    public abstract void onCoordinatorJobSuccess(CoordinatorJobEvent wje);
+
+    /**
+     * On coord job transition to failure state
+     * @param CoordinatorJobEvent
+     */
+    public abstract void onCoordinatorJobFailure(CoordinatorJobEvent wje);
+
+    /**
+     * On coord job transition to suspend state
+     * @param CoordinatorJobEvent
+     */
+    public abstract void onCoordinatorJobSuspend(CoordinatorJobEvent wje);
+
+    /**
+     * On coord action transition to waiting state
+     * @param CoordinatorActionEvent
+     */
+    public abstract void onCoordinatorActionWaiting(CoordinatorActionEvent wae);
+
+    /**
+     * On coord action transition to start state
+     * @param CoordinatorActionEvent
+     */
+    public abstract void onCoordinatorActionStart(CoordinatorActionEvent wae);
+
+    /**
+     * On coord action transition to success state
+     * @param CoordinatorActionEvent
+     */
+    public abstract void onCoordinatorActionSuccess(CoordinatorActionEvent wae);
+
+    /**
+     * On coord action transition to failure state
+     * @param CoordinatorActionEvent
+     */
+    public abstract void onCoordinatorActionFailure(CoordinatorActionEvent wae);
+
+    /**
+     * On coord action transition to suspend state
+     * @param CoordinatorActionEvent
+     */
+    public abstract void onCoordinatorActionSuspend(CoordinatorActionEvent wae);
+
+    /**
+     * On bundle job transition to start state
+     * @param BundleJobEvent
+     */
+    public abstract void onBundleJobStart(BundleJobEvent wje);
+
+    /**
+     * On bundle job transition to success state
+     * @param BundleJobEvent
+     */
+    public abstract void onBundleJobSuccess(BundleJobEvent wje);
+
+    /**
+     * On bundle job transition to failure state
+     * @param BundleJobEvent
+     */
+    public abstract void onBundleJobFailure(BundleJobEvent wje);
+
+    /**
+     * On bundle job transition to suspend state
+     * @param BundleJobEvent
+     */
+    public abstract void onBundleJobSuspend(BundleJobEvent wje);
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -90,6 +90,12 @@ public class CoordActionGetForCheckJPAEx
         if (arr[6] != null){
             bean.setSlaXml((String) arr[6]);
         }
+        if (arr[7] != null){
+            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[7]));
+        }
+        if (arr[8] != null){
+            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[8]));
+        }
         return bean;
     }
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -94,6 +94,12 @@ public class CoordActionGetForExternalId
         if (arr[6] != null){
             bean.setSlaXml((String) arr[6]);
         }
+        if (arr[7] != null){
+            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[7]));
+        }
+        if (arr[8] != null){
+            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[8]));
+        }
         return bean;
     }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -17,12 +17,15 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ParamChecker;
 
 /**
@@ -95,6 +98,12 @@ public class CoordActionGetForStartJPAEx
         if (arr[9] != null) {
             bean.setErrorCode((String) arr[9]);
         }
+        if (arr[10] != null){
+            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[10]));
+        }
+        if (arr[11] != null){
+            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[11]));
+        }
         return bean;
     }
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -17,12 +17,15 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ParamChecker;
 
 /**
@@ -80,6 +83,12 @@ public class CoordActionGetForTimeoutJPA
         if (arr[4] != null) {
             bean.setPending((Integer) arr[4]);
         }
+        if (arr[5] != null){
+            bean.setNominalTime(DateUtils.toDate((Timestamp) arr[5]));
+        }
+        if (arr[6] != null){
+            bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[6]));
+        }
         return bean;
 
     }

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * DB query executor to fetch columns 'user' and 'appName' from Coordinator Job table
+ */
+public class CoordinatorJobGetForUserAppnameJPAExecutor implements JPAExecutor<CoordinatorJobBean> {
+
+    private String coordJobId = null;
+
+    public CoordinatorJobGetForUserAppnameJPAExecutor(String coordJobId) {
+        ParamChecker.notNull(coordJobId, "coordJobId");
+        this.coordJobId = coordJobId;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    public String getName() {
+        return "CoordJobGetForUserAppnameJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public CoordinatorJobBean execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("GET_COORD_JOB_FOR_USER_APPNAME");
+            q.setParameter("id", coordJobId);
+            Object[] arr = (Object[]) q.getSingleResult();
+            CoordinatorJobBean bean = new CoordinatorJobBean();
+            if (arr[0] != null) {
+                bean.setUser((String) arr[0]);
+            }
+            if (arr[1] != null){
+                bean.setAppName((String) arr[1]);
+            }
+            return bean;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.event.EventQueue;
+import org.apache.oozie.event.MemoryEventQueue;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.event.listener.JobEventListener;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.sla.event.listener.SLAEventListener;
+import org.apache.oozie.util.XLog;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Service class that handles the events system - creating events queue,
+ * managing configured properties and managing and invoking various event
+ * listeners via worker threads
+ */
+public class EventHandlerService implements Service {
+
+    public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
+    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
+    public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
+    public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
+    public static final String CONF_APP_TYPES = CONF_PREFIX + "app.types";
+    public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
+    public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
+
+    private static EventQueue eventQueue;
+    private int queueMaxSize;
+    private XLog LOG;
+    private static Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
+    private Set<String> apptypes;
+    private static int batchSize;
+    private static boolean eventsConfigured = false;
+
+    @SuppressWarnings("unchecked")
+    public void init(Services services) throws ServiceException {
+        try {
+            eventsConfigured = true;
+            Configuration conf = services.getConf();
+            queueMaxSize = conf.getInt(CONF_QUEUE_SIZE, 10000);
+            LOG = XLog.getLog(getClass());
+            Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
+            eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
+            batchSize = conf.getInt(CONF_BATCH_SIZE, 10);
+            eventQueue.init(queueMaxSize, batchSize);
+            // initialize app-types to switch on events for
+            initApptypes(conf);
+            // initialize event listeners
+            initEventListeners(conf);
+            // initialize worker threads via Scheduler
+            initWorkerThreads(conf, services);
+            LOG.info(
+                    "EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}], Events configured for App-types = [{3}]",
+                    eventQueue.getClass().getName(), listenerMap.toString(), apptypes);
+        }
+        catch (Exception ex) {
+            throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
+        }
+    }
+
+    private void initApptypes(Configuration conf) {
+        apptypes = new HashSet<String>();
+        for (String jobtype : conf.getStringCollection(CONF_APP_TYPES)) {
+            String tmp = jobtype.trim().toLowerCase();
+            if (tmp.length() == 0) {
+                continue;
+            }
+            apptypes.add(tmp);
+        }
+    }
+
+    private void initEventListeners(Configuration conf) {
+        Class<?>[] listenerClass = (Class<?>[]) conf.getClasses(CONF_LISTENERS);
+        for (int i = 0; i < listenerClass.length; i++) {
+            Object listener = null;
+            try {
+                listener = listenerClass[i].newInstance();
+            }
+            catch (InstantiationException e) {
+                LOG.warn("Could not create event listener instance, " + e);
+            }
+            catch (IllegalAccessException e) {
+                LOG.warn("Illegal access to event listener instance, " + e);
+            }
+            addEventListener(listener);
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public void addEventListener(Object listener) {
+        if (listener instanceof JobEventListener) {
+            List listenersList = listenerMap.get(MessageType.JOB);
+            if (listenersList == null) {
+                listenersList = new ArrayList();
+                listenerMap.put(MessageType.JOB, (List<? extends JobEventListener>) listenersList);
+            }
+            listenersList.add(listener);
+            ((JobEventListener) listener).init();
+        }
+        else if (listener instanceof SLAEventListener) {
+            List listenersList = listenerMap.get(MessageType.SLA);
+            if (listenersList == null) {
+                listenersList = new ArrayList();
+                listenerMap.put(MessageType.SLA, (List<? extends SLAEventListener>) listenersList);
+            }
+            listenersList.add(listener);
+            ((SLAEventListener) listener).init();
+        }
+        else {
+            LOG.warn("Event listener [{0}] is of undefined type", listener.getClass().getCanonicalName());
+        }
+    }
+
+    public static boolean isEventsConfigured() {
+        return eventsConfigured;
+    }
+
+    private void initWorkerThreads(Configuration conf, Services services) {
+        Runnable eventWorker = new EventWorker();
+        // schedule runnable by default every 5 min
+        services.get(SchedulerService.class).schedule(eventWorker, 10, conf.getInt(CONF_WORKER_INTERVAL, 60),
+                SchedulerService.Unit.SEC);
+    }
+
+    @Override
+    public void destroy() {
+        for (MessageType type : listenerMap.keySet()) {
+            Iterator<?> iter = listenerMap.get(type).iterator();
+            while (iter.hasNext()) {
+                if (type == MessageType.JOB) {
+                    ((JobEventListener) iter.next()).destroy();
+                }
+                else if (type == MessageType.SLA) {
+                    ((SLAEventListener) iter.next()).destroy();
+                }
+            }
+        }
+        eventsConfigured = false;
+    }
+
+    @Override
+    public Class<? extends Service> getInterface() {
+        return EventHandlerService.class;
+    }
+
+    public boolean checkSupportedApptype(String appType) {
+        if (!apptypes.contains(appType.toLowerCase())) {
+            return false;
+        }
+        return true;
+    }
+
+    public void setAppTypes(Set<String> types) {
+        apptypes = types;
+    }
+
+    public Set<String> getAppTypes() {
+        return apptypes;
+    }
+
+    public void queueEvent(Event event) {
+        eventQueue.add(event);
+    }
+
+    public EventQueue getEventQueue() {
+        return eventQueue;
+    }
+
+    public class EventWorker implements Runnable {
+
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                return;
+            }
+            if (!eventQueue.isEmpty()) {
+                Set<Event> work = eventQueue.pollBatch();
+                for (Event event : work) {
+                    MessageType msgType = event.getMsgType();
+                    List<?> listeners = listenerMap.get(msgType);
+                    if (listeners != null) {
+                        Iterator<?> iter = listeners.iterator();
+                        while (iter.hasNext()) {
+                            if (msgType == MessageType.JOB) {
+                                invokeJobEventListener(iter, (JobEvent) event);
+                            }
+                            else if (msgType == MessageType.SLA) {
+                                invokeSLAEventListener(iter, (SLAEvent) event);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        private void invokeJobEventListener(Iterator<?> iter, JobEvent event) {
+            JobEventListener el = (JobEventListener) iter.next();
+            switch (event.getAppType()) {
+                case WORKFLOW_JOB:
+                    onWorkflowJobEvent(event, el);
+                    break;
+                case WORKFLOW_ACTION:
+                    onWorkflowActionEvent(event, el);
+                    break;
+                case COORDINATOR_JOB:
+                    onCoordinatorJobEvent(event, el);
+                    break;
+                case COORDINATOR_ACTION:
+                    onCoordinatorActionEvent(event, el);
+                    break;
+                case BUNDLE_JOB:
+                    onBundleJobEvent(event, el);
+                    break;
+                default:
+                    XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
+                            event.getAppType());
+            }
+        }
+
+        private void invokeSLAEventListener(Iterator<?> iter, SLAEvent event) {
+            SLAEventListener sel = (SLAEventListener) iter.next();
+            switch (event.getEventType()) {
+                case START_MET:
+                    sel.onStartMet((SLAEvent) event);
+                    break;
+                case START_MISS:
+                    sel.onStartMiss((SLAEvent) event);
+                    break;
+                case END_MET:
+                    sel.onEndMet((SLAEvent) event);
+                    break;
+                case END_MISS:
+                    sel.onEndMiss((SLAEvent) event);
+                    break;
+                case DURATION_MET:
+                    sel.onDurationMet((SLAEvent) event);
+                    break;
+                case DURATION_MISS:
+                    sel.onDurationMiss((SLAEvent) event);
+                    break;
+                default:
+                    XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getEventType());
+            }
+        }
+
+        private void onWorkflowJobEvent(JobEvent event, JobEventListener el) {
+            switch (event.getEventStatus()) {
+                case STARTED:
+                    el.onWorkflowJobStart((WorkflowJobEvent) event);
+                    break;
+                case SUCCESS:
+                    el.onWorkflowJobSuccess((WorkflowJobEvent) event);
+                    break;
+                case FAILURE:
+                    el.onWorkflowJobFailure((WorkflowJobEvent) event);
+                    break;
+                case SUSPEND:
+                    el.onWorkflowJobSuspend((WorkflowJobEvent) event);
+                    break;
+                default:
+                    XLog.getLog(EventHandlerService.class).info("Undefined WF Job event-status - {0}",
+                            event.getEventStatus());
+            }
+        }
+
+        private void onWorkflowActionEvent(JobEvent event, JobEventListener el) {
+            switch (event.getEventStatus()) {
+                case STARTED:
+                    el.onWorkflowActionStart((WorkflowActionEvent) event);
+                    break;
+                case SUCCESS:
+                    el.onWorkflowActionSuccess((WorkflowActionEvent) event);
+                    break;
+                case FAILURE:
+                    el.onWorkflowActionFailure((WorkflowActionEvent) event);
+                    break;
+                default:
+                    XLog.getLog(EventHandlerService.class).info("Undefined WF action event-status - {0}",
+                            event.getEventStatus());
+            }
+        }
+
+        private void onCoordinatorJobEvent(JobEvent event, JobEventListener el) {
+            switch (event.getEventStatus()) {
+                case STARTED:
+                    el.onCoordinatorJobStart((CoordinatorJobEvent) event);
+                    break;
+                case SUCCESS:
+                    el.onCoordinatorJobSuccess((CoordinatorJobEvent) event);
+                    break;
+                case FAILURE:
+                    el.onCoordinatorJobFailure((CoordinatorJobEvent) event);
+                    break;
+                case SUSPEND:
+                    el.onCoordinatorJobSuspend((CoordinatorJobEvent) event);
+                    break;
+                default:
+                    XLog.getLog(EventHandlerService.class).info("Undefined Coord Job event-status - {0}",
+                            event.getEventStatus());
+            }
+        }
+
+        private void onCoordinatorActionEvent(JobEvent event, JobEventListener el) {
+            switch (event.getEventStatus()) {
+                case WAITING:
+                    el.onCoordinatorActionWaiting((CoordinatorActionEvent) event);
+                    break;
+                case STARTED:
+                    el.onCoordinatorActionStart((CoordinatorActionEvent) event);
+                    break;
+                case SUCCESS:
+                    el.onCoordinatorActionSuccess((CoordinatorActionEvent) event);
+                    break;
+                case FAILURE:
+                    el.onCoordinatorActionFailure((CoordinatorActionEvent) event);
+                    break;
+                case SUSPEND:
+                    el.onCoordinatorActionSuspend((CoordinatorActionEvent) event);
+                    break;
+                default:
+                    XLog.getLog(EventHandlerService.class).info("Undefined Coord action event-status - {0}",
+                            event.getEventStatus());
+            }
+        }
+
+        private void onBundleJobEvent(JobEvent event, JobEventListener el) {
+            switch (event.getEventStatus()) {
+                case STARTED:
+                    el.onBundleJobStart((BundleJobEvent) event);
+                    break;
+                case SUCCESS:
+                    el.onBundleJobSuccess((BundleJobEvent) event);
+                    break;
+                case FAILURE:
+                    el.onBundleJobFailure((BundleJobEvent) event);
+                    break;
+                case SUSPEND:
+                    el.onBundleJobSuspend((BundleJobEvent) event);
+                    break;
+                default:
+                    XLog.getLog(EventHandlerService.class).info("Undefined Bundle Job event-type - {0}",
+                            event.getEventStatus());
+            }
+        }
+
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java Wed Apr 10 00:51:43 2013
@@ -256,7 +256,7 @@ public class RecoveryService implements 
                         CoordinatorJobBean coordJob = jpaService
                                 .execute(new CoordJobGetJPAExecutor(caction.getJobId()));
                         queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
-                                coordJob.getAuthToken(), caction.getJobId()));
+                                coordJob.getAppName(), coordJob.getAuthToken(), caction.getJobId()));
 
                         log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
                                 + caction.getId());

Added: oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla.event.listener;
+
+import org.apache.oozie.client.event.SLAEvent;
+
+/**
+ * Event listener for SLA related events, defining methods corresponding to
+ * SLA mets/misses
+ */
+public abstract class SLAEventListener {
+
+    /**
+     * Initialize the listener
+     */
+    public abstract void init();
+
+    /**
+     * Destroy the listener
+     */
+    public abstract void destroy();
+
+    /**
+     * on SLA job start-time limit met
+     * @param SLAEvent
+     */
+    public abstract void onStartMet(SLAEvent work);
+
+    /**
+     * on SLA job start-time limit missed
+     * @param SLAEvent
+     */
+    public abstract void onStartMiss(SLAEvent event);
+
+    /**
+     * on SLA job end-time limit met
+     * @param SLAEvent
+     */
+    public abstract void onEndMet(SLAEvent work);
+
+    /**
+     * on SLA job end-time limit missed
+     * @param SLAEvent
+     */
+    public abstract void onEndMiss(SLAEvent event);
+
+    /**
+     * on SLA job duration limit met
+     * @param SLAEvent
+     */
+    public abstract void onDurationMet(SLAEvent work);
+
+    /**
+     * on SLA job duration limit missed
+     * @param SLAEvent
+     */
+    public abstract void onDurationMiss(SLAEvent event);
+
+}

Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Wed Apr 10 00:51:43 2013
@@ -1785,4 +1785,46 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.EventHandlerService.app.types</name>
+        <value>workflow_job, coordinator_action</value>
+        <description>
+            The app-types among workflow/coordinator/bundle job/action for which
+            for which events system is enabled.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.EventHandlerService.event.queue</name>
+        <value>org.apache.oozie.event.MemoryEventQueue</value>
+        <description>
+            The implementation for EventQueue in use by the EventHandlerService.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.EventHandlerService.queue.size</name>
+        <value>10000</value>
+        <description>
+            Maximum number of events to be contained in the event queue.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.EventHandlerService.worker.interval</name>
+        <value>60</value>
+        <description>
+            The default interval (seconds) at which the worker threads will be scheduled to run
+            and process events.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.EventHandlerService.batch.size</name>
+        <value>10</value>
+        <description>
+            The batch size for batched draining per thread from the event queue.
+        </description>
+    </property>
+
 </configuration>

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java Wed Apr 10 00:51:43 2013
@@ -87,7 +87,7 @@ public class TestCoordActionStartXComman
     public void testActionStartCommand() throws IOException, JPAExecutorException, CommandException {
         String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1";
         addRecordToActionTable(actionId, 1, null);
-        new CoordActionStartXCommand(actionId, "me", "mytoken", "myjob").call();
+        new CoordActionStartXCommand(actionId, "me", "myapp", "mytoken", "myjob").call();
         checkCoordAction(actionId);
     }
 
@@ -104,7 +104,7 @@ public class TestCoordActionStartXComman
         String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1";
         String wfApp = "<start to='${someParam}' />";
         addRecordToActionTable(actionId, 1, wfApp);
-        new CoordActionStartXCommand(actionId, "me", "mytoken", "myjob").call();
+        new CoordActionStartXCommand(actionId, "me", "myapp", "mytoken", "myjob").call();
         final JPAService jpaService = Services.get().get(JPAService.class);
         CoordinatorActionBean action = jpaService.execute(new CoordActionGetForStartJPAExecutor(actionId));
         if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) {
@@ -132,7 +132,7 @@ public class TestCoordActionStartXComman
                 CoordinatorAction.Status.SUBMITTED, "coord-action-start-escape-strings.xml", 0);
 
         String actionId = action.getId();
-        new CoordActionStartXCommand(actionId, getTestUser(), "undef","myjob").call();
+        new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "undef", "myjob").call();
 
         final JPAService jpaService = Services.get().get(JPAService.class);
         action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));

Added: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.command.coord.CoordActionCheckXCommand;
+import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
+import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordinatorXCommand;
+import org.apache.oozie.command.wf.KillXCommand;
+import org.apache.oozie.command.wf.ResumeXCommand;
+import org.apache.oozie.command.wf.SignalXCommand;
+import org.apache.oozie.command.wf.StartXCommand;
+import org.apache.oozie.command.wf.SuspendXCommand;
+import org.apache.oozie.command.wf.WorkflowXCommand;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.ActionNodeDef;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestControlNodeHandler;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+
+/**
+ * Testcase to test that events are correctly generated from corresponding
+ * Commands and inserted into events queue
+ */
+public class TestEventGeneration extends XDataTestCase {
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        Services services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+        services.init();
+    }
+
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    public void testWorkflowJobEvent() throws Exception {
+        EventHandlerService ehs = _testEventHandlerService();
+        EventQueue queue = ehs.getEventQueue();
+        assertEquals(queue.getCurrentSize(), 0);
+        WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+        JPAService jpaService = Services.get().get(JPAService.class);
+
+        // Starting job
+        new StartXCommand(job.getId()).call();
+        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(wfJobGetCmd);
+        assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.STARTED, event.getEventStatus());
+        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+        assertEquals(job.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(job.getAppName(), event.getAppName());
+        assertEquals(job.getStartTime(), event.getStartTime());
+        assertEquals(0, queue.getCurrentSize());
+
+        // Suspending job
+        new SuspendXCommand(job.getId()).call();
+        job = jpaService.execute(wfJobGetCmd);
+        assertEquals(WorkflowJob.Status.SUSPENDED, job.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        event = (WorkflowJobEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.SUSPEND, event.getEventStatus());
+        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+        assertEquals(job.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(job.getAppName(), event.getAppName());
+        assertEquals(0, queue.getCurrentSize());
+
+        // Resuming job
+        new ResumeXCommand(job.getId()).call();
+        job = jpaService.execute(wfJobGetCmd);
+        assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        event = (WorkflowJobEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+        assertEquals(job.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(job.getAppName(), event.getAppName());
+        assertEquals(job.getStartTime(), event.getStartTime());
+        assertEquals(0, queue.getCurrentSize());
+
+        // Killing job
+        new KillXCommand(job.getId()).call();
+        job = jpaService.execute(wfJobGetCmd);
+        assertEquals(WorkflowJob.Status.KILLED, job.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        event = (WorkflowJobEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.FAILURE, event.getEventStatus());
+        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+        assertEquals(job.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(job.getAppName(), event.getAppName());
+        assertEquals(job.getStartTime(), event.getStartTime());
+        assertEquals(job.getEndTime(), event.getEndTime());
+        assertEquals(0, queue.getCurrentSize());
+
+        // Successful job (testing SignalX)
+        job = _createWorkflowJob();
+        LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
+        wfInstance.start();
+        job.setWfInstance(wfInstance);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
+        WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
+        new SignalXCommand(job.getId(), wfAction.getId()).call();
+        job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
+        assertEquals(WorkflowJob.Status.SUCCEEDED, job.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        event = (WorkflowJobEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+        assertEquals(job.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(job.getAppName(), event.getAppName());
+        assertEquals(job.getStartTime(), event.getStartTime());
+        assertEquals(job.getEndTime(), event.getEndTime());
+        assertEquals(0, queue.getCurrentSize());
+
+    }
+
+    public void testCoordinatorActionEvent() throws Exception {
+        EventHandlerService ehs = _testEventHandlerService();
+        // reduce noise from WF Job events (also default) by setting it to only
+        // coord action
+        ehs.setAppTypes(new HashSet<String>(Arrays.asList(new String[] { "coordinator_action" })));
+        EventQueue queue = ehs.getEventQueue();
+        assertEquals(queue.getCurrentSize(), 0);
+        Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:14Z");
+        CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+                false, 0);
+        _modifyCoordForRunning(coord);
+        final JPAService jpaService = Services.get().get(JPAService.class);
+
+        // Action WAITING on materialization
+        new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
+        final CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(coord.getId() + "@1");
+        CoordinatorActionBean action = jpaService.execute(coordGetCmd);
+        assertEquals(CoordinatorAction.Status.WAITING, action.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.WAITING, event.getEventStatus());
+        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+        assertEquals(action.getId(), event.getId());
+        assertEquals(action.getJobId(), event.getParentId());
+        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertNotNull(event.getMissingDeps());
+        assertEquals(coord.getUser(), event.getUser());
+        assertEquals(coord.getAppName(), event.getAppName());
+        assertEquals(0, queue.getCurrentSize());
+
+        // Make Action ready
+        new CoordActionInputCheckXCommand(action.getId(), coord.getId()).call();
+        action = jpaService.execute(coordGetCmd);
+        assertEquals(CoordinatorAction.Status.READY, action.getStatus());
+
+        waitFor(1 * 100, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return jpaService.execute(coordGetCmd).getStatus() == CoordinatorAction.Status.RUNNING;
+            }
+        });
+        assertEquals(1, queue.getCurrentSize());
+        event = (CoordinatorActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.STARTED, event.getEventStatus());
+        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+        assertEquals(action.getId(), event.getId());
+        assertEquals(action.getJobId(), event.getParentId());
+        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertEquals(coord.getUser(), event.getUser());
+        assertEquals(coord.getAppName(), event.getAppName());
+        assertEquals(0, queue.getCurrentSize());
+
+        // Action Success
+        action = jpaService.execute(coordGetCmd);
+        WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
+        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+        new CoordActionCheckXCommand(action.getId(), 0).call();
+        action = jpaService.execute(coordGetCmd);
+        assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        event = (CoordinatorActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.SUCCESS, event.getEventStatus());
+        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+        assertEquals(action.getId(), event.getId());
+        assertEquals(action.getJobId(), event.getParentId());
+        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertEquals(coord.getUser(), event.getUser());
+        assertEquals(coord.getAppName(), event.getAppName());
+        assertEquals(0, queue.getCurrentSize());
+
+        // Action Failure
+        action.setStatus(CoordinatorAction.Status.RUNNING);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        wfJob.setStatus(WorkflowJob.Status.KILLED);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+        new CoordActionCheckXCommand(action.getId(), 0).call();
+        action = jpaService.execute(coordGetCmd);
+        assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
+        assertEquals(1, queue.getCurrentSize());
+        event = (CoordinatorActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.FAILURE, event.getEventStatus());
+        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+        assertEquals(action.getId(), event.getId());
+        assertEquals(action.getJobId(), event.getParentId());
+        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertEquals(coord.getUser(), event.getUser());
+        assertEquals(coord.getAppName(), event.getAppName());
+        assertEquals(0, queue.getCurrentSize());
+
+    }
+
+    public void testWorkflowJobEventError() throws Exception {
+        EventHandlerService ehs = _testEventHandlerService();
+        EventQueue queue = ehs.getEventQueue();
+        final WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
+        // create event with error code and message
+        WorkflowXCommand<Void> myCmd = new KillXCommand(job.getId()) {
+            @Override
+            protected Void execute() {
+                WorkflowXCommand.generateEvent(job, "errorCode", "errorMsg");
+                return null;
+            }
+        };
+        myCmd.call();
+        WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals("errorCode", event.getErrorCode());
+        assertEquals("errorMsg", event.getErrorMessage());
+        assertEquals(EventStatus.FAILURE, event.getEventStatus());
+
+    }
+
+    public void testCoordinatorActionEventDependencies() throws Exception {
+        EventHandlerService ehs = _testEventHandlerService();
+        EventQueue queue = ehs.getEventQueue();
+        final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
+                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
+            @Override
+            protected Void execute() {
+                CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName());
+                return null;
+            }
+        };
+
+        // CASE 1: Only pull missing deps
+        action.setMissingDependencies("pull");
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        myCmd.call();
+        CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals("pull", event.getMissingDeps());
+
+        // CASE 2: Only hcat (push) missing deps
+        action.setMissingDependencies(null);
+        action.setPushMissingDependencies("push");
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        myCmd.call();
+        event = (CoordinatorActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals("push", event.getMissingDeps());
+
+        // CASE 3: Both types
+        action.setMissingDependencies("pull");
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        myCmd.call();
+        event = (CoordinatorActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals("pull" + CoordELFunctions.INSTANCE_SEPARATOR + "push", event.getMissingDeps());
+
+    }
+
+    private EventHandlerService _testEventHandlerService() throws Exception {
+        Services services = Services.get();
+        EventHandlerService ehs = services.get(EventHandlerService.class);
+        assertNotNull(ehs);
+        return ehs;
+    }
+
+    private void _modifyCoordForRunning(CoordinatorJobBean coord) throws Exception {
+        String wfXml = IOUtils.getResourceAsString("wf-credentials.xml", -1);
+        writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
+        String coordXml = coord.getJobXml();
+        coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
+        Services.get().get(JPAService.class).execute(new CoordJobUpdateJPAExecutor(coord));
+    }
+
+    private WorkflowJobBean _createWorkflowJob() throws Exception {
+        LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
+                new StartNodeDef(TestControlNodeHandler.class, "one"))
+                .addNode(new ActionNodeDef("one", "<java></java>", TestActionNodeHandler.class, "end", "end"))
+                .addNode(new EndNodeDef("end", TestControlNodeHandler.class));
+        Configuration conf = new Configuration();
+        Path appUri = new Path(getAppPath(), "workflow.xml");
+        conf.set(OozieClient.APP_PATH, appUri.toString());
+        conf.set(OozieClient.LOG_TOKEN, "testToken");
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        WorkflowJobBean workflow = createWorkflow(app, conf, "auth", WorkflowJob.Status.PREP,
+                WorkflowInstance.Status.PREP);
+        String executionPath = "/";
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
+        jpaService.execute(wfInsertCmd);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(workflow.getId(), "one", WorkflowAction.Status.OK,
+                executionPath);
+        wfAction.setPending();
+        wfAction.setSignalValue(WorkflowAction.Status.OK.name());
+        jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
+
+        return workflow;
+    }
+
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+/**
+ * Test case to check correct functioning of MemoryEventQueue
+ */
+public class TestEventQueue extends XDataTestCase {
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        Services services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+        services.init();
+    }
+
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    public void testMemoryEventQueueBasic() throws Exception {
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        assertNotNull(ehs);
+        EventQueue eventQ = ehs.getEventQueue();
+        assertNotNull(eventQ);
+        assertTrue(eventQ instanceof MemoryEventQueue); //default
+    }
+
+    public void testQueueOperations() throws Exception {
+        Services services = Services.get();
+        Configuration conf = services.getConf();
+
+        // set smaller batch size for the events queue
+        conf.setInt(EventHandlerService.CONF_BATCH_SIZE, 3);
+        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        ehs.init(services);
+        EventQueue eventQ = ehs.getEventQueue();
+        assertEquals(eventQ.getCurrentSize(), 0);
+        assertEquals(eventQ.getBatchSize(), 3);
+
+        // create some events to enqueue
+        WorkflowJobEvent wfEvent = new WorkflowJobEvent("1234-W", "1234-C", WorkflowJob.Status.RUNNING, getTestUser(),
+                "myapp", null, null);
+        for (int i = 0; i < 10; i++)
+            ehs.queueEvent(wfEvent);
+        assertEquals(eventQ.getCurrentSize(), 10);
+
+        // test single threads polling from queue
+        int numThreads = 1;
+        Thread[] thread = new Thread[numThreads];
+        for (int i = 0; i < numThreads; i++) {
+            thread[i] = new Thread(ehs.new EventWorker());
+            thread[i].run();
+        }
+        assertEquals(eventQ.getCurrentSize(), 7); // n(events) - n(batch) i.e.
+                                                  // 10-3 = 7
+
+        // restore events count to 10
+        for (int i = 0; i < 3; i++)
+            ehs.queueEvent(wfEvent);
+        assertEquals(eventQ.getCurrentSize(), 10);
+        // test two threads polling concurrently from queue
+        numThreads = 2;
+        thread = new Thread[numThreads];
+        for (int i = 0; i < numThreads; i++) {
+            thread[i] = new Thread(ehs.new EventWorker());
+            thread[i].run();
+        }
+        assertEquals(eventQ.getCurrentSize(), 4); // n(events) - n(batch)*n(threads)
+                                                  // i.e. 10 - 3*2 = 4
+
+        // enqueue events again
+        for (int i = 0; i < 6; i++)
+            ehs.queueEvent(wfEvent);
+        assertEquals(eventQ.getCurrentSize(), 10);
+        // test the 2 threads draining repeatedly (mimicking SchedulerService)
+        // from queue
+        int repetition = 3;
+        int r = 0;
+        while (r < repetition) {
+            if (eventQ.isEmpty())
+                break;
+            for (int i = 0; i < numThreads; i++) {
+                thread[i].run();
+            }
+            r++;
+        }
+        assertEquals(eventQ.getCurrentSize(), 0);
+    }
+
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.event.MemoryEventQueue;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.event.listener.JobEventListener;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestEventHandlerService extends XDataTestCase {
+
+    StringBuilder output = new StringBuilder();
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        Services services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+        services.init();
+    }
+
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    public void testService() throws Exception {
+        EventHandlerService ehs = _testEventHandlerService();
+        // check default initializations
+        assertTrue(ehs.getEventQueue() instanceof MemoryEventQueue);
+        Set<String> jobtypes = ehs.getAppTypes();
+        assertTrue(jobtypes.contains("workflow_job"));
+        assertTrue(jobtypes.contains("coordinator_action"));
+    }
+
+    public void testEventListener() throws Exception {
+        EventHandlerService ehs = _testEventHandlerService();
+        ehs.addEventListener(new DummyJobEventListener());
+
+        /*
+         * Workflow Job events
+         */
+        WorkflowJobEvent event = new WorkflowJobEvent("jobid", "parentid", WorkflowJob.Status.RUNNING, getTestUser(),
+                "myapp", null, null);
+        ehs.queueEvent(event);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Job STARTED"));
+        output.setLength(0);
+
+        event.setStatus(WorkflowJob.Status.SUSPENDED);
+        ehs.queueEvent(event);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Job SUSPEND"));
+        output.setLength(0);
+
+        event.setStatus(WorkflowJob.Status.SUCCEEDED);
+        ehs.queueEvent(event);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Job SUCCESS"));
+        output.setLength(0);
+
+        event.setStatus(WorkflowJob.Status.KILLED);
+        ehs.queueEvent(event);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Job FAILURE"));
+        output.setLength(0);
+
+        /*
+         * Coordinator Action events
+         */
+        CoordinatorActionEvent event2 = new CoordinatorActionEvent("jobid", "parentid",
+                CoordinatorAction.Status.WAITING, getTestUser(), "myapp", null, null, null);
+        ehs.queueEvent(event2);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Coord Action WAITING"));
+        output.setLength(0);
+
+        event2.setStatus(CoordinatorAction.Status.RUNNING);
+        ehs.queueEvent(event2);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Coord Action START"));
+        output.setLength(0);
+
+        event2.setStatus(CoordinatorAction.Status.SUSPENDED);
+        ehs.queueEvent(event2);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Coord Action SUSPEND"));
+        output.setLength(0);
+
+        event2.setStatus(CoordinatorAction.Status.SUCCEEDED);
+        ehs.queueEvent(event2);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Coord Action SUCCESS"));
+        output.setLength(0);
+
+        event2.setStatus(CoordinatorAction.Status.TIMEDOUT);
+        ehs.queueEvent(event2);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Coord Action FAILURE"));
+        output.setLength(0);
+
+        event2.setStatus(CoordinatorAction.Status.KILLED);
+        ehs.queueEvent(event2);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Coord Action FAILURE"));
+        output.setLength(0);
+    }
+
+    private EventHandlerService _testEventHandlerService() throws Exception {
+        Services services = Services.get();
+        EventHandlerService ehs = services.get(EventHandlerService.class);
+        assertNotNull(ehs);
+        return ehs;
+    }
+
+    class DummyJobEventListener extends JobEventListener {
+
+        @Override
+        public void onWorkflowJobStart(WorkflowJobEvent wje) {
+            if (wje != null) {
+                output.append("Dummy Workflow Job STARTED");
+            }
+        }
+
+        @Override
+        public void onWorkflowJobSuccess(WorkflowJobEvent wje) {
+            if (wje != null) {
+                output.append("Dummy Workflow Job SUCCESS");
+            }
+        }
+
+        @Override
+        public void onWorkflowJobFailure(WorkflowJobEvent wje) {
+            if (wje != null) {
+                output.append("Dummy Workflow Job FAILURE");
+            }
+        }
+
+        @Override
+        public void onWorkflowJobSuspend(WorkflowJobEvent wje) {
+            if (wje != null) {
+                output.append("Dummy Workflow Job SUSPEND");
+            }
+        }
+
+        @Override
+        public void onWorkflowActionStart(WorkflowActionEvent wae) {
+            if (wae != null) {
+                output.append("Dummy Workflow Action START");
+            }
+        }
+
+        @Override
+        public void onWorkflowActionSuccess(WorkflowActionEvent wae) {
+            if (wae != null) {
+                output.append("Dummy Workflow Action SUCCESS");
+            }
+        }
+
+        @Override
+        public void onWorkflowActionFailure(WorkflowActionEvent wae) {
+            if (wae != null) {
+                output.append("Dummy Workflow Action FAILURE");
+            }
+        }
+
+        @Override
+        public void onWorkflowActionSuspend(WorkflowActionEvent wae) {
+            if (wae != null) {
+                output.append("Dummy Workflow Action SUSPEND");
+            }
+        }
+
+        @Override
+        public void onCoordinatorJobStart(CoordinatorJobEvent cje) {
+            if (cje != null) {
+                output.append("Dummy Coord Job START");
+            }
+        }
+
+        @Override
+        public void onCoordinatorJobSuccess(CoordinatorJobEvent cje) {
+            if (cje != null) {
+                output.append("Dummy Coord Job SUCCESS");
+            }
+        }
+
+        @Override
+        public void onCoordinatorJobFailure(CoordinatorJobEvent cje) {
+            if (cje != null) {
+                output.append("Dummy Coord Job FAILURE");
+            }
+        }
+
+        @Override
+        public void onCoordinatorJobSuspend(CoordinatorJobEvent cje) {
+            if (cje != null) {
+                output.append("Dummy Coord Job SUSPEND");
+            }
+        }
+
+        @Override
+        public void onCoordinatorActionWaiting(CoordinatorActionEvent cae) {
+            if (cae != null) {
+                output.append("Dummy Coord Action WAITING");
+            }
+        }
+
+        @Override
+        public void onCoordinatorActionStart(CoordinatorActionEvent cae) {
+            if (cae != null) {
+                output.append("Dummy Coord Action START");
+            }
+        }
+
+        @Override
+        public void onCoordinatorActionSuccess(CoordinatorActionEvent cae) {
+            if (cae != null) {
+                output.append("Dummy Coord Action SUCCESS");
+            }
+        }
+
+        @Override
+        public void onCoordinatorActionFailure(CoordinatorActionEvent cae) {
+            if (cae != null) {
+                output.append("Dummy Coord Action FAILURE");
+            }
+        }
+
+        @Override
+        public void onCoordinatorActionSuspend(CoordinatorActionEvent cae) {
+            if (cae != null) {
+                output.append("Dummy Coord Action SUSPEND");
+            }
+        }
+
+        @Override
+        public void onBundleJobStart(BundleJobEvent bje) {
+            if (bje != null) {
+                output.append("Dummy Bundle Job START");
+            }
+        }
+
+        @Override
+        public void onBundleJobSuccess(BundleJobEvent bje) {
+            if (bje != null) {
+                output.append("Dummy Bundle Job SUCCESS");
+            }
+        }
+
+        @Override
+        public void onBundleJobFailure(BundleJobEvent bje) {
+            if (bje != null) {
+                output.append("Dummy Bundle Job FAILURE");
+            }
+        }
+
+        @Override
+        public void onBundleJobSuspend(BundleJobEvent bje) {
+            if (bje != null) {
+                output.append("Dummy Bundle Job SUSPEND");
+            }
+        }
+
+        @Override
+        public void init() {
+        }
+
+        @Override
+        public void destroy() {
+        }
+
+    }
+
+}

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Apr 10 00:51:43 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1209 Event generation and handling for workflow and coordinator (mona)
 OOZIE-1118 improve logic of purge service (rkanter)
 OOZIE-1205 If the JobTracker is restarted during a Fork, Oozie doesn't fail all of the currently running actions (rkanter)
 OOZIE-1286 SSH Action does not properly handle arguments that have spaces (rkanter)