You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/06/05 20:34:19 UTC

svn commit: r1489996 - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org/apache/oozie/event/ core/src/main/java/org/apache/oozie/sla/ core/src/main/java/org/apache/oozie/sla/listener/ core/src/main/java/org/apac...

Author: mona
Date: Wed Jun  5 18:34:19 2013
New Revision: 1489996

URL: http://svn.apache.org/r1489996
Log:
OOZIE-1375 Generate Job notification events for Workflow Actions (mona)

Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
    oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
    oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
    oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    oozie/trunk/release-log.txt

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Wed Jun  5 18:34:19 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,7 +30,6 @@ import org.apache.oozie.action.ActionExe
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.WorkflowAction.Status;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
@@ -41,6 +40,7 @@ import org.apache.oozie.executor.jpa.Wor
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.service.ActionCheckerService;
 import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
@@ -63,6 +63,7 @@ public class ActionCheckXCommand extends
     private JPAService jpaService = null;
     private ActionExecutor executor = null;
     private List<JsonBean> updateList = new ArrayList<JsonBean>();
+    private boolean generateEvent = false;
 
     public ActionCheckXCommand(String actionId) {
         this(actionId, -1);
@@ -183,6 +184,7 @@ public class ActionCheckXCommand extends
                     wfAction.setErrorInfo(EXEC_DATA_MISSING,
                             "Execution Complete, but Execution Data Missing from Action");
                     failJob(context);
+                    generateEvent = true;
                 } else {
                     wfAction.setPending();
                     queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
@@ -198,10 +200,10 @@ public class ActionCheckXCommand extends
                     .getMessage(), ex);
 
             wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
-
             switch (ex.getErrorType()) {
                 case FAILED:
                     failJob(context, wfAction);
+                    generateEvent = true;
                     break;
                 case ERROR:
                     handleUserRetry(wfAction);
@@ -209,6 +211,7 @@ public class ActionCheckXCommand extends
                 case TRANSIENT:                 // retry N times, then suspend workflow
                     if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
                         handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
+                        generateEvent = true;
                         wfAction.setPendingAge(new Date());
                         wfAction.setRetries(0);
                         wfAction.setStartTime(null);
@@ -224,6 +227,9 @@ public class ActionCheckXCommand extends
         finally {
             try {
                 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+                if (generateEvent && EventHandlerService.isEnabled()) {
+                    generateEvent(wfAction, wfJob.getUser());
+                }
             }
             catch (JPAExecutorException e) {
                 throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Wed Jun  5 18:34:19 2013
@@ -44,6 +44,7 @@ import org.apache.oozie.executor.jpa.JPA
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
@@ -256,6 +257,9 @@ public class ActionEndXCommand extends A
         finally {
             try {
                 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+                if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+                    generateEvent(wfAction, wfJob.getUser());
+                }
             }
             catch (JPAExecutorException e) {
                 throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Wed Jun  5 18:34:19 2013
@@ -37,7 +37,9 @@ import org.apache.oozie.executor.jpa.Wor
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
 import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.Services;
@@ -49,6 +51,7 @@ import org.apache.oozie.util.db.SLADbXOp
  * Kill workflow action and invoke action executor to kill the underlying context.
  *
  */
+@SuppressWarnings("deprecation")
 public class ActionKillXCommand extends ActionXCommand<Void> {
     private String actionId;
     private String jobId;
@@ -160,6 +163,9 @@ public class ActionKillXCommand extends 
                 finally {
                     try {
                         jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+                        if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+                            generateEvent(wfAction, wfJob.getUser());
+                        }
                     }
                     catch (JPAExecutorException e) {
                         throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Wed Jun  5 18:34:19 2013
@@ -32,7 +32,6 @@ import org.apache.oozie.XException;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.action.control.ControlNodeActionExecutor;
-import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
@@ -47,6 +46,7 @@ import org.apache.oozie.executor.jpa.JPA
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
@@ -57,6 +57,7 @@ import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.util.db.SLADbXOperations;
 
+@SuppressWarnings("deprecation")
 public class ActionStartXCommand extends ActionXCommand<Void> {
     public static final String EL_ERROR = "EL_ERROR";
     public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
@@ -303,6 +304,9 @@ public class ActionStartXCommand extends
         finally {
             try {
                 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+                if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+                    generateEvent(wfAction, wfJob.getUser());
+                }
             }
             catch (JPAExecutorException e) {
                 throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Jun  5 18:34:19 2013
@@ -138,7 +138,7 @@ public class KillXCommand extends Workfl
 
                     queue(new ActionKillXCommand(action.getId(), action.getType()));
                 }
-                if (action.getStatus() == WorkflowActionBean.Status.PREP
+                else if (action.getStatus() == WorkflowActionBean.Status.PREP
                         || action.getStatus() == WorkflowActionBean.Status.START_RETRY
                         || action.getStatus() == WorkflowActionBean.Status.START_MANUAL
                         || action.getStatus() == WorkflowActionBean.Status.END_RETRY
@@ -152,6 +152,9 @@ public class KillXCommand extends Workfl
                         insertList.add(slaEvent);
                     }
                     updateList.add(action);
+                    if (EventHandlerService.isEnabled()) {
+                        generateEvent(action, wfJob.getUser());
+                    }
                 }
             }
             wfJob.setLastModifiedTime(new Date());

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Jun  5 18:34:19 2013
@@ -73,7 +73,7 @@ public class SignalXCommand extends Work
     private WorkflowActionBean wfAction;
     private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
-    private boolean generateEvent;
+    private boolean generateEvent = false;
     private String wfJobErrorCode;
     private String wfJobErrorMsg;
 
@@ -325,9 +325,6 @@ public class SignalXCommand extends Work
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             if (generateEvent && EventHandlerService.isEnabled()) {
                 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
-                if (wfAction != null) {
-                    generateEvent(wfAction, wfJob.getUser());
-                }
             }
         }
         catch (JPAExecutorException je) {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java Wed Jun  5 18:34:19 2013
@@ -68,12 +68,13 @@ public abstract class WorkflowXCommand<T
     }
 
     protected void generateEvent(WorkflowActionBean wfAction, String wfUser) {
-        // Workflow action events not filtered since required for sla
-        WorkflowActionEvent event = new WorkflowActionEvent(wfAction.getId(), wfAction.getJobId(),
-                wfAction.getStatus(), wfUser, wfAction.getName(), wfAction.getStartTime(), wfAction.getEndTime());
-        event.setErrorCode(wfAction.getErrorCode());
-        event.setErrorMessage(wfAction.getErrorMessage());
-        eventService.queueEvent(event);
+        if (eventService.isSupportedApptype(AppType.WORKFLOW_ACTION.name())) {
+            WorkflowActionEvent event = new WorkflowActionEvent(wfAction.getId(), wfAction.getJobId(),
+                    wfAction.getStatus(), wfUser, wfAction.getName(), wfAction.getStartTime(), wfAction.getEndTime());
+            event.setErrorCode(wfAction.getErrorCode());
+            event.setErrorMessage(wfAction.getErrorMessage());
+            eventService.queueEvent(event);
+        }
     }
 
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java Wed Jun  5 18:34:19 2013
@@ -64,6 +64,7 @@ public class WorkflowActionEvent extends
             case KILLED:
             case FAILED:
                 setEventStatus(EventStatus.FAILURE);
+                break;
             case START_MANUAL:
             case END_MANUAL:
                 setEventStatus(EventStatus.SUSPEND);
@@ -91,7 +92,7 @@ public class WorkflowActionEvent extends
     }
 
     public void setErrorMessage(String msg) {
-        errorCode = msg;
+        errorMessage = msg;
     }
 
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Wed Jun  5 18:34:19 2013
@@ -215,7 +215,7 @@ public class SLACalculatorMemory impleme
         SLACalcStatus slaCalc = slaMap.get(jobId);
         List<JsonBean> updateList = new ArrayList<JsonBean>();
         SLASummaryBean slaInfo = null;
-        boolean ret = false;
+        boolean hasSla = false;
         if (slaCalc != null) {
             synchronized (slaCalc) {
                 byte eventProc = slaCalc.getEventProcessed();
@@ -244,7 +244,7 @@ public class SLACalculatorMemory impleme
                 if (slaInfo.getSlaProcessed() == 2) {
                     slaMap.remove(jobId);
                 }
-                ret = true;
+                hasSla = true;
             }
         }
         else if (historySet.contains(jobId)) {
@@ -257,16 +257,19 @@ public class SLACalculatorMemory impleme
             }
             slaInfo.setSlaProcessed(2);
             historySet.remove(jobId);
-            ret = true;
+            hasSla = true;
         }
-        slaInfo.setLastModifiedTime(new Date());
-        updateList.add(slaInfo);
-        if (jpa != null) {
-            jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+        if (hasSla) {
+            slaInfo.setLastModifiedTime(new Date());
+            updateList.add(slaInfo);
+            if (jpa != null) {
+                jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+            }
+            XLog.getLog(SLAService.class)
+                    .trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
         }
-        XLog.getLog(SLAService.class).trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
 
-        return ret;
+        return hasSla;
     }
 
     /**

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java Wed Jun  5 18:34:19 2013
@@ -68,13 +68,13 @@ public class SLAJobEventListener extends
     }
 
     private void sendEventToSLAService(JobEvent event, String status) {
-        SLAService slaService = Services.get().get(SLAService.class);
-        if (slaService != null && !status.equals(CoordinatorAction.Status.WAITING.name())
+        if (!status.equals(CoordinatorAction.Status.WAITING.name())
                 && !status.equals(CoordinatorAction.Status.SUSPENDED.name())) {
             Date startTime = event.getStartTime();
             Date endTime = event.getEndTime();
             try {
-                slaService.addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
+                Services.get().get(SLAService.class)
+                        .addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
             }
             catch (ServiceException se) {
                 XLog.getLog(SLAService.class).error("Exception happened while sending Job-Status event for SLA", se);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java Wed Jun  5 18:34:19 2013
@@ -31,7 +31,6 @@ import org.apache.oozie.sla.SLACalculato
 import org.apache.oozie.sla.SLACalculatorMemory;
 import org.apache.oozie.sla.SLARegistrationBean;
 import org.apache.oozie.util.XLog;
-
 import com.google.common.annotations.VisibleForTesting;
 
 public class SLAService implements Service {
@@ -60,6 +59,9 @@ public class SLAService implements Servi
                         + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
             }
             LOG = XLog.getLog(getClass());
+            java.util.Set<String> appTypes = eventHandler.getAppTypes();
+            appTypes.add("workflow_action");
+            eventHandler.setAppTypes(appTypes);
 
             Runnable slaThread = new SLAWorker(calcImpl);
             // schedule runnable by default every 30 sec

Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Wed Jun  5 18:34:19 2013
@@ -1845,9 +1845,7 @@
 
     <property>
         <name>oozie.service.EventHandlerService.event.listeners</name>
-        <value>org.apache.oozie.jms.JMSJobEventListener,
-               org.apache.oozie.sla.listener.SLAJobEventListener
-        </value>
+        <value>org.apache.oozie.jms.JMSJobEventListener</value>
     </property>
 
     <property>

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Wed Jun  5 18:34:19 2013
@@ -33,18 +33,24 @@ import org.apache.oozie.CoordinatorJobBe
 import org.apache.oozie.DagEngine;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.event.Event;
 import org.apache.oozie.client.event.JobEvent;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.coord.CoordActionCheckXCommand;
 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
 import org.apache.oozie.command.coord.CoordinatorXCommand;
+import org.apache.oozie.command.wf.ActionCheckXCommand;
+import org.apache.oozie.command.wf.ActionKillXCommand;
+import org.apache.oozie.command.wf.ActionStartXCommand;
+import org.apache.oozie.command.wf.ActionXCommand;
 import org.apache.oozie.command.wf.KillXCommand;
 import org.apache.oozie.command.wf.ResumeXCommand;
 import org.apache.oozie.command.wf.SignalXCommand;
@@ -56,11 +62,13 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
@@ -88,6 +96,7 @@ public class TestEventGeneration extends
 
     EventQueue queue;
     Services services;
+    EventHandlerService ehs;
 
     @Override
     @Before
@@ -97,7 +106,8 @@ public class TestEventGeneration extends
         Configuration conf = services.getConf();
         conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
         services.init();
-        queue = services.get(EventHandlerService.class).getEventQueue();
+        ehs = services.get(EventHandlerService.class);
+        queue = ehs.getEventQueue();
     }
 
     @Override
@@ -119,7 +129,7 @@ public class TestEventGeneration extends
         job = jpaService.execute(wfJobGetCmd);
         assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
         assertEquals(1, queue.size());
-        WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
+        JobEvent event = (JobEvent) queue.poll();
         assertNotNull(event);
         assertEquals(EventStatus.STARTED, event.getEventStatus());
         assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
@@ -134,7 +144,7 @@ public class TestEventGeneration extends
         job = jpaService.execute(wfJobGetCmd);
         assertEquals(WorkflowJob.Status.SUSPENDED, job.getStatus());
         assertEquals(1, queue.size());
-        event = (WorkflowJobEvent) queue.poll();
+        event = (JobEvent) queue.poll();
         assertNotNull(event);
         assertEquals(EventStatus.SUSPEND, event.getEventStatus());
         assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
@@ -148,7 +158,7 @@ public class TestEventGeneration extends
         job = jpaService.execute(wfJobGetCmd);
         assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
         assertEquals(1, queue.size());
-        event = (WorkflowJobEvent) queue.poll();
+        event = (JobEvent) queue.poll();
         assertNotNull(event);
         assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
         assertEquals(job.getId(), event.getId());
@@ -162,7 +172,7 @@ public class TestEventGeneration extends
         job = jpaService.execute(wfJobGetCmd);
         assertEquals(WorkflowJob.Status.KILLED, job.getStatus());
         assertEquals(1, queue.size());
-        event = (WorkflowJobEvent) queue.poll();
+        event = (JobEvent) queue.poll();
         assertNotNull(event);
         assertEquals(EventStatus.FAILURE, event.getEventStatus());
         assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
@@ -184,7 +194,7 @@ public class TestEventGeneration extends
         job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
         assertEquals(WorkflowJob.Status.SUCCEEDED, job.getStatus());
         assertEquals(1, queue.size());
-        event = (WorkflowJobEvent) queue.poll();
+        event = (JobEvent) queue.poll();
         assertNotNull(event);
         assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
         assertEquals(job.getId(), event.getId());
@@ -192,16 +202,14 @@ public class TestEventGeneration extends
         assertEquals(job.getAppName(), event.getAppName());
         assertEquals(job.getStartTime(), event.getStartTime());
         assertEquals(job.getEndTime(), event.getEndTime());
-        assertEquals(0, queue.size());
 
     }
 
     @Test
     public void testCoordinatorActionEvent() throws Exception {
-        EventHandlerService ehs = services.get(EventHandlerService.class);
-        // reduce noise from WF Job events (also default) by setting it to only
+        // avoid noise from other apptype events by setting it to only
         // coord action
-        ehs.setAppTypes(new HashSet<String>(Arrays.asList(new String[] { "coordinator_action" })));
+        ehs.setAppTypes(new HashSet<String>(Arrays.asList("coordinator_action")));
         assertEquals(queue.size(), 0);
         Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:14Z");
@@ -216,13 +224,13 @@ public class TestEventGeneration extends
         CoordinatorActionBean action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.WAITING, action.getStatus());
         assertEquals(1, queue.size());
-        CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
+        JobEvent event = (JobEvent) queue.poll();
         assertNotNull(event);
         assertEquals(EventStatus.WAITING, event.getEventStatus());
         assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
-        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
         assertEquals(action.getCreatedTime(), event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
@@ -233,19 +241,19 @@ public class TestEventGeneration extends
         action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.READY, action.getStatus());
 
-        waitFor(1 * 100, new Predicate() {
+        waitFor(4 * 100, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 return jpaService.execute(coordGetCmd).getStatus() == CoordinatorAction.Status.RUNNING;
             }
         });
 
-        event = _pollQueue();
+        event = (JobEvent) queue.poll();
         assertEquals(EventStatus.STARTED, event.getEventStatus());
         assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
-        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
         assertEquals(action.getCreatedTime(), event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
@@ -256,14 +264,15 @@ public class TestEventGeneration extends
         wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
         jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
         new CoordActionCheckXCommand(action.getId(), 0).call();
+        Thread.sleep(300);
         action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
-        event = _pollQueue();
+        event = (JobEvent) queue.poll();
         assertEquals(EventStatus.SUCCESS, event.getEventStatus());
         assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
-        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
         assertEquals(action.getCreatedTime(), event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
@@ -271,17 +280,17 @@ public class TestEventGeneration extends
         // Action Failure
         action.setStatus(CoordinatorAction.Status.RUNNING);
         jpaService.execute(new CoordActionUpdateJPAExecutor(action));
-        wfJob.setStatus(WorkflowJob.Status.KILLED);
+        wfJob.setStatus(WorkflowJob.Status.FAILED);
         jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
         new CoordActionCheckXCommand(action.getId(), 0).call();
         action = jpaService.execute(coordGetCmd);
-        assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
-        event = _pollQueue();
+        assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
+        event = (JobEvent) queue.poll();
         assertEquals(EventStatus.FAILURE, event.getEventStatus());
         assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
-        assertEquals(action.getNominalTime(), event.getNominalTime());
+        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
         assertEquals(action.getCreatedTime(), event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
@@ -289,6 +298,70 @@ public class TestEventGeneration extends
     }
 
     @Test
+    public void testWorkflowActionEvent() throws Exception {
+        assertEquals(queue.size(), 0);
+        // avoid noise from other apptype events by setting it to only
+        // workflow action
+        ehs.setAppTypes(new HashSet<String>(Arrays.asList("workflow_action")));
+        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP, true);
+        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
+        JPAService jpaService = Services.get().get(JPAService.class);
+
+        // Starting job
+        new ActionStartXCommand(action.getId(), "map-reduce").call();
+        action = jpaService.execute(wfActionGetCmd);
+        assertEquals(WorkflowAction.Status.RUNNING, action.getStatus());
+        assertEquals(1, queue.size());
+        WorkflowActionEvent event = (WorkflowActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.STARTED, event.getEventStatus());
+        assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
+        assertEquals(action.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(action.getName(), event.getAppName());
+        assertEquals(action.getStartTime(), event.getStartTime());
+        assertEquals(0, queue.size());
+
+        // Suspending job
+        ActionExecutor.Context context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
+        ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
+        ActionCheckXCommandForTest dac = new ActionCheckXCommandForTest(context, executor, action.getId());
+        dac.execute();
+        action = dac.getAction();
+        assertEquals(WorkflowAction.Status.START_MANUAL, action.getStatus());
+        assertEquals(1, queue.size());
+        event = (WorkflowActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.SUSPEND, event.getEventStatus());
+        assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
+        assertEquals(action.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(action.getName(), event.getAppName());
+        assertEquals(0, queue.size());
+
+        // Killing job
+        action.setStatus(WorkflowAction.Status.KILLED);
+        action.setPendingOnly();
+        jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+        new ActionKillXCommand(action.getId()).call();
+        action = jpaService.execute(wfActionGetCmd);
+        assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
+        assertEquals(1, queue.size());
+        event = (WorkflowActionEvent) queue.poll();
+        assertNotNull(event);
+        assertEquals(EventStatus.FAILURE, event.getEventStatus());
+        assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
+        assertEquals(action.getId(), event.getId());
+        assertEquals(job.getUser(), event.getUser());
+        assertEquals(action.getName(), event.getAppName());
+        assertEquals(action.getStartTime(), event.getStartTime());
+        assertEquals(action.getEndTime(), event.getEndTime());
+        assertEquals(0, queue.size());
+
+    }
+
+    @Test
     public void testWorkflowJobEventError() throws Exception {
         final WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
         // create event with error code and message
@@ -398,6 +471,38 @@ public class TestEventGeneration extends
         assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
     }
 
+    private class ActionCheckXCommandForTest extends ActionCheckXCommand {
+
+        ActionExecutor.Context context;
+        ActionExecutor executor;
+        WorkflowActionBean action;
+        JPAService jpa;
+
+        public ActionCheckXCommandForTest(ActionExecutor.Context context, ActionExecutor executor, String actionId)
+                throws JPAExecutorException {
+            super(actionId);
+            this.context = context;
+            this.executor = executor;
+            jpa = Services.get().get(JPAService.class);
+            this.action = jpa.execute(new WorkflowActionGetJPAExecutor(actionId));
+        }
+
+        @Override
+        public Void execute() throws CommandException {
+            handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
+            action = (WorkflowActionBean) ((ActionExecutorContext) context).getAction();
+            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+                generateEvent(action, getTestUser());
+            }
+            return null;
+        }
+
+        public WorkflowActionBean getAction() {
+            return action;
+        }
+
+    }
+
     private WorkflowJobBean _createWorkflowJob() throws Exception {
         LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
                 new StartNodeDef(TestControlNodeHandler.class, "one"))
@@ -417,7 +522,7 @@ public class TestEventGeneration extends
         WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
         jpaService.execute(wfInsertCmd);
         WorkflowActionBean wfAction = addRecordToWfActionTable(workflow.getId(), "one", WorkflowAction.Status.OK,
-                executionPath);
+                executionPath, true);
         wfAction.setPending();
         wfAction.setSignalValue(WorkflowAction.Status.OK.name());
         jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
@@ -425,14 +530,6 @@ public class TestEventGeneration extends
         return workflow;
     }
 
-    private CoordinatorActionEvent _pollQueue() {
-        Event e;
-        do {
-            e = queue.poll();
-        } while (!(e instanceof CoordinatorActionEvent));
-        return (CoordinatorActionEvent) e;
-    }
-
     private void _modifyCoordForFailureAction(CoordinatorJobBean coord) throws Exception {
         String wfXml = IOUtils.getResourceAsString("wf-invalid-fork.xml", -1);
         writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java Wed Jun  5 18:34:19 2013
@@ -45,6 +45,8 @@ public class TestEventQueue extends XDat
                 JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName() + ","
                         + EventHandlerService.class.getName() + "," + SLAService.class.getName());
         conf.setInt(EventHandlerService.CONF_BATCH_SIZE, 3);
+        conf.set(EventHandlerService.CONF_LISTENERS, ""); // this unit test is meant to
+                                                          // target queue operations only
         services.init();
     }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java Wed Jun  5 18:34:19 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,7 +66,7 @@ public class TestWorkflowActionGetJPAExe
             execPath.append("/fork" + i);
         }
         WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP, execPath
-                .toString());
+                .toString(), false);
         _testGetActionWithExecPath(action.getId(), execPath.toString());
     }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java Wed Jun  5 18:34:19 2013
@@ -21,6 +21,7 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.event.BundleJobEvent;
 import org.apache.oozie.event.CoordinatorActionEvent;
@@ -142,6 +143,47 @@ public class TestEventHandlerService ext
         ehs.new EventWorker().run();
         assertTrue(output.toString().contains("Coord Action event FAILURE"));
         output.setLength(0);
+
+        /*
+         * Workflow Action events
+         */
+        WorkflowActionEvent event3 = new WorkflowActionEvent("waction-1", "parentid",
+                WorkflowAction.Status.RUNNING, getTestUser(), "myapp", null, null);
+        ehs.queueEvent(event3);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Action event STARTED"));
+        output.setLength(0);
+
+        event3.setStatus(WorkflowAction.Status.START_MANUAL);
+        ehs.queueEvent(event3);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Action event SUSPEND"));
+        output.setLength(0);
+
+        event3.setStatus(WorkflowAction.Status.OK);
+        ehs.queueEvent(event3);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Action event SUCCESS"));
+        output.setLength(0);
+
+        event3.setStatus(WorkflowAction.Status.ERROR);
+        ehs.queueEvent(event3);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Action event FAILURE"));
+        output.setLength(0);
+
+        event3.setStatus(WorkflowAction.Status.KILLED);
+        ehs.queueEvent(event3);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Action event FAILURE"));
+        output.setLength(0);
+
+        event3.setStatus(WorkflowAction.Status.FAILED);
+        ehs.queueEvent(event3);
+        ehs.new EventWorker().run();
+        assertTrue(output.toString().contains("Workflow Action event FAILURE"));
+        output.setLength(0);
+
     }
 
     private EventHandlerService _testEventHandlerService() throws Exception {
@@ -171,21 +213,21 @@ public class TestEventHandlerService ext
         @Override
         public void onCoordinatorJobEvent(CoordinatorJobEvent cje) {
             if (cje != null) {
-                output.append("Dummy Coord Job event "+cje.getEventStatus());
+                output.append("Dummy Coord Job event " + cje.getEventStatus());
             }
         }
 
         @Override
         public void onCoordinatorActionEvent(CoordinatorActionEvent cae) {
             if (cae != null) {
-                output.append("Dummy Coord Action event "+cae.getEventStatus());
+                output.append("Dummy Coord Action event " + cae.getEventStatus());
             }
         }
 
         @Override
         public void onBundleJobEvent(BundleJobEvent bje) {
             if (bje != null) {
-                output.append("Dummy Bundle Job event "+bje.getEventStatus());
+                output.append("Dummy Bundle Job event " + bje.getEventStatus());
             }
         }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Wed Jun  5 18:34:19 2013
@@ -179,7 +179,7 @@ public class TestSLAEventGeneration exte
         slaEvent = slas.getSLACalculator().get(jobId);
         slaEvent.setEventProcessed(0); //resetting to receive sla events
         ehs.new EventWorker().run();
-        Thread.sleep(300); //time for event listeners to run
+        Thread.sleep(300); // time for listeners to run
         slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
         assertEquals(jobId, slaEvent.getId());
         assertNotNull(slaEvent.getActualStart());
@@ -189,8 +189,9 @@ public class TestSLAEventGeneration exte
 
         // test that sla processes the Job Event from Kill command
         new KillXCommand(jobId).call();
+        ehs.getEventQueue().poll(); //ignore the wf-action event generated
         ehs.new EventWorker().run();
-        Thread.sleep(300);
+        Thread.sleep(300); // time for listeners to run
         slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
         assertEquals(jobId, slaEvent.getId());
         assertNotNull(slaEvent.getActualEnd());
@@ -268,7 +269,6 @@ public class TestSLAEventGeneration exte
         slaEvent = slas.getSLACalculator().get(actionId);
         slaEvent.setEventProcessed(0); //resetting for testing sla event
         ehs.new EventWorker().run();
-        Thread.sleep(300); //time for event listeners to run
         slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
         assertEquals(actionId, slaEvent.getId());
         assertNotNull(slaEvent.getActualStart());
@@ -278,7 +278,6 @@ public class TestSLAEventGeneration exte
         // test that sla processes the Job Event from Kill command
         new CoordKillXCommand(jobId).call();
         ehs.new EventWorker().run();
-        Thread.sleep(300); //time for event listeners to run
         slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
         assertEquals(actionId, slaEvent.getId());
         assertNotNull(slaEvent.getActualEnd());

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java Wed Jun  5 18:34:19 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.event.Coordinato
 import org.apache.oozie.event.CoordinatorJobEvent;
 import org.apache.oozie.event.WorkflowActionEvent;
 import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.event.listener.JobEventListener;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
@@ -54,6 +55,7 @@ public class TestSLAJobEventListener ext
         Configuration conf = services.getConf();
         conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService,"
                 + "org.apache.oozie.sla.service.SLAService");
+        conf.setClass(EventHandlerService.CONF_LISTENERS, SLAJobEventListener.class, JobEventListener.class);
         services.init();
     }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed Jun  5 18:34:19 2013
@@ -88,6 +88,7 @@ import org.jdom.JDOMException;
 
 import com.google.common.annotations.VisibleForTesting;
 
+@SuppressWarnings("deprecation")
 public abstract class XDataTestCase extends XHCatTestCase {
 
     protected static String slaXml = " <sla:info xmlns:sla='uri:oozie:sla:0.1'>"
@@ -717,12 +718,17 @@ public abstract class XDataTestCase exte
      */
     protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status)
             throws Exception {
-        return addRecordToWfActionTable(wfId, actionName, status, "");
+        return addRecordToWfActionTable(wfId, actionName, status, "", false);
     }
 
     protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status,
-            String execPath) throws Exception {
-        WorkflowActionBean action = createWorkflowAction(wfId, actionName, status);
+            boolean pending) throws Exception {
+        return addRecordToWfActionTable(wfId, actionName, status, "", pending);
+    }
+
+    protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status,
+            String execPath, boolean pending) throws Exception {
+        WorkflowActionBean action = createWorkflowAction(wfId, actionName, status, pending);
         action.setExecutionPath(execPath);
         try {
             JPAService jpaService = Services.get().get(JPAService.class);
@@ -1127,8 +1133,8 @@ public abstract class XDataTestCase exte
      * @return workflow action bean
      * @throws Exception thrown if unable to create workflow action bean
      */
-    protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status)
-            throws Exception {
+    protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status,
+            boolean pending) throws Exception {
         WorkflowActionBean action = new WorkflowActionBean();
         action.setName(actionName);
         action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, actionName));
@@ -1139,7 +1145,13 @@ public abstract class XDataTestCase exte
         action.setStartTime(new Date());
         action.setEndTime(new Date());
         action.setLastCheckTime(new Date());
-        action.resetPendingOnly();
+        action.setCred("null");
+        if (pending) {
+            action.setPendingOnly();
+        }
+        else {
+            action.resetPendingOnly();
+        }
 
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
@@ -1163,6 +1175,11 @@ public abstract class XDataTestCase exte
         return action;
     }
 
+    protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status)
+            throws Exception {
+        return createWorkflowAction(wfId, actionName, status, false);
+    }
+
     /**
      * Create bundle job bean
      *

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Jun  5 18:34:19 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1375 Generate Job notification events for Workflow Actions (mona)
 OOZIE-1357 Can't view more than 1000 actions of a coordinator and paging does not work (ryota)
 OOZIE-1381 Oozie does not support access to the distributed cache file under different name node (ryota)
 OOZIE-1298 TestPartitionDependencyManagerEhcache.testEvictionOnTimeToIdle is flakey (rohini)