You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/21 04:05:57 UTC

svn commit: r1495272 [1/2] - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org...

Author: virag
Date: Fri Jun 21 02:05:56 2013
New Revision: 1495272

URL: http://svn.apache.org/r1495272
Log:
OOZIE-1424 Improve SLA reliability on restart, fix bugs related to SLA and event generation (virag)

Added:
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
    oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
    oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
    oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
    oozie/trunk/release-log.txt

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Fri Jun 21 02:05:56 2013
@@ -74,6 +74,8 @@ import org.apache.openjpa.persistence.jd
         // Select query used only by test cases
         @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
 
+        // Select query used by SLAService on restart
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.status, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
         // Select query used by ActionInfo command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
         // Select Query used by Timeout command
@@ -266,6 +268,14 @@ public class CoordinatorActionBean exten
         return Status.valueOf(status);
     }
 
+    /**
+     * Return the status in string
+     * @return
+     */
+    public String getStatusStr() {
+        return status;
+    }
+
     @Override
     public void setStatus(Status status) {
         super.setStatus(status);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java Fri Jun 21 02:05:56 2013
@@ -117,6 +117,27 @@ public class DagEngine extends BaseEngin
     }
 
     /**
+     * Submit a workflow through a coordinator. It validates configuration properties.
+     * @param conf job conf
+     * @param parentId parent of workflow
+     * @return
+     * @throws DagEngineException
+     */
+    public String submitJobFromCoordinator(Configuration conf, String parentId) throws DagEngineException {
+        validateSubmitConfiguration(conf);
+        try {
+            String jobId;
+            SubmitXCommand submit = new SubmitXCommand(conf);
+            jobId = submit.call();
+            new StartXCommand(jobId, parentId).call();
+            return jobId;
+        }
+        catch (CommandException ex) {
+            throw new DagEngineException(ex);
+        }
+    }
+
+    /**
      * Submit a pig/hive/mapreduce job through HTTP.
      * <p/>
      * It validates configuration properties.

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java Fri Jun 21 02:05:56 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,6 +59,8 @@ import org.apache.openjpa.persistence.jd
 
     @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
 
+    @NamedQuery(name = "GET_ACTION_FOR_SLA", query = "select a.id, a.status, a.startTimestamp, a.endTimestamp from WorkflowActionBean a where a.id = :id"),
+
     @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
 
     @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
@@ -219,6 +221,26 @@ public class WorkflowActionBean extends 
     }
 
     /**
+     * Return whether workflow action in terminal state or not
+     *
+     * @return
+     */
+    public boolean inTerminalState() {
+        boolean isTerminalState = false;
+        switch (WorkflowAction.Status.valueOf(status)) {
+            case ERROR:
+            case FAILED:
+            case KILLED:
+            case OK:
+                isTerminalState = true;
+                break;
+            default:
+                break;
+        }
+        return isTerminalState;
+    }
+
+    /**
      * Return if the action execution is complete.
      *
      * @return if the action start is complete.
@@ -238,7 +260,7 @@ public class WorkflowActionBean extends 
         return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
                 || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
     }
-    
+
     /**
      * Return true if the action is USER_RETRY
      *

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java Fri Jun 21 02:05:56 2013
@@ -63,6 +63,8 @@ import org.apache.openjpa.persistence.jd
 
     @NamedQuery(name = "GET_WORKFLOW_FOR_UPDATE", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),
 
+    @NamedQuery(name = "GET_WORKFLOW_FOR_SLA", query = "select w.id, w.status, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),
+
     @NamedQuery(name = "GET_WORKFLOW_ID_FOR_EXTERNAL_ID", query = "select  w.id from WorkflowJobBean w where w.externalId = :externalId"),
 
     @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS", query = "select count(w) from WorkflowJobBean w where w.status = :status"),
@@ -192,6 +194,20 @@ public class WorkflowJobBean extends Jso
         setProtoActionConf(protoActionConf);
     }
 
+    public boolean inTerminalState() {
+        boolean inTerminalState = false;
+        switch (WorkflowJob.Status.valueOf(status)) {
+            case FAILED:
+            case KILLED:
+            case SUCCEEDED:
+                inTerminalState = true;
+                break;
+            default:
+                break;
+        }
+        return inTerminalState;
+    }
+
     public String getLogToken() {
         return logToken;
     }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java Fri Jun 21 02:05:56 2013
@@ -82,7 +82,7 @@ public abstract class TransitionXCommand
             if (actionBean instanceof CoordinatorActionBean) {
                 CoordinatorActionBean caBean = (CoordinatorActionBean) actionBean;
                 caBean.setJobId(coordJob.getId());
-                CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName());
+                CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName(), null);
             }
             // TODO generate Coord Job event
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Fri Jun 21 02:05:56 2013
@@ -45,6 +45,8 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 
 /**
@@ -55,6 +57,8 @@ public class CoordActionCheckXCommand ex
     private String actionId;
     private int actionCheckDelay;
     private CoordinatorActionBean coordAction = null;
+    private CoordinatorJobBean coordJob;
+    private WorkflowJobBean workflowJob;
     private JPAService jpaService = null;
     private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
@@ -122,9 +126,7 @@ public class CoordActionCheckXCommand ex
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
             CoordinatorAction.Status endStatus = coordAction.getStatus();
             if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
-                CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
-                        coordAction.getJobId()));
-                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime());
             }
         }
         catch (XException ex) {
@@ -160,6 +162,9 @@ public class CoordActionCheckXCommand ex
 
             if (jpaService != null) {
                 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
+                coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+                        coordAction.getJobId()));
+                workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
                 LogUtils.setLogInfo(coordAction, logInfo);
             }
             else {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Fri Jun 21 02:05:56 2013
@@ -215,7 +215,7 @@ public class CoordActionInputCheckXComma
                     if (EventHandlerService.isEnabled()
                             && coordAction.getStatus() != CoordinatorAction.Status.READY) {
                         //since event is not to be generated unless action RUNNING via StartX
-                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
                     }
                 }
                 else {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Fri Jun 21 02:05:56 2013
@@ -165,7 +165,6 @@ public class CoordActionStartXCommand ex
             // XmlUtils.prettyPrint(runConf).toString());
             DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
             try {
-                boolean startJob = true;
                 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
                 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
                         SlaAppType.COORDINATOR_ACTION, log);
@@ -175,7 +174,7 @@ public class CoordActionStartXCommand ex
 
                 // Normalize workflow appPath here;
                 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
-                String wfId = dagEngine.submitJob(conf, startJob);
+                String wfId = dagEngine.submitJobFromCoordinator(conf, actionId);
                 coordAction.setStatus(CoordinatorAction.Status.RUNNING);
                 coordAction.setExternalId(wfId);
                 coordAction.incrementAndGetPending();
@@ -192,7 +191,7 @@ public class CoordActionStartXCommand ex
                     try {
                         jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
                         if (EventHandlerService.isEnabled()) {
-                            generateEvent(coordAction, user, appName);
+                            generateEvent(coordAction, user, appName, wfJob.getStartTime());
                         }
                     }
                     catch (JPAExecutorException je) {
@@ -248,7 +247,7 @@ public class CoordActionStartXCommand ex
                         // call JPAExecutor to do the bulk writes
                         jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
                         if (EventHandlerService.isEnabled()) {
-                            generateEvent(coordAction, user, appName);
+                            generateEvent(coordAction, user, appName, null);
                         }
                     }
                     catch (JPAExecutorException je) {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java Fri Jun 21 02:05:56 2013
@@ -60,7 +60,7 @@ public class CoordActionTimeOutXCommand 
                 actionBean.setLastModifiedTime(new Date());
                 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(actionBean));
                 if (EventHandlerService.isEnabled()) {
-                    generateEvent(actionBean, user, appName);
+                    generateEvent(actionBean, user, appName, null);
                 }
             }
             catch (JPAExecutorException e) {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Fri Jun 21 02:05:56 2013
@@ -48,6 +48,7 @@ import org.apache.oozie.executor.jpa.JPA
 public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
     private WorkflowJobBean workflow;
     private CoordinatorActionBean coordAction = null;
+    private CoordinatorJobBean coordJob;
     private JPAService jpaService = null;
     private int maxRetries = 1;
     private List<JsonBean> updateList = new ArrayList<JsonBean>();
@@ -69,7 +70,6 @@ public class CoordActionUpdateXCommand e
         try {
             LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
             Status slaStatus = null;
-            CoordinatorAction.Status preCoordStatus = coordAction.getStatus();
             if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
                 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
                 coordAction.setPending(0);
@@ -109,7 +109,7 @@ public class CoordActionUpdateXCommand e
                 return null;
             }
 
-            LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status from " + preCoordStatus
+            LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status "
                     + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
 
             coordAction.setLastModifiedTime(new Date());
@@ -134,10 +134,8 @@ public class CoordActionUpdateXCommand e
             }
 
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
-            if (preCoordStatus != coordAction.getStatus() && EventHandlerService.isEnabled()) {
-                CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
-                        coordAction.getJobId()));
-                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+            if (EventHandlerService.isEnabled()) {
+                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime());
             }
 
             LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
@@ -180,6 +178,8 @@ public class CoordActionUpdateXCommand e
             try {
                 coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId()));
                 if (coordAction != null) {
+                    coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+                            coordAction.getJobId()));
                     break;
                 }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Fri Jun 21 02:05:56 2013
@@ -169,9 +169,6 @@ public class CoordKillXCommand extends K
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
-            if (EventHandlerService.isEnabled()) {
-                generateEvents(coordJob);
-            }
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Fri Jun 21 02:05:56 2013
@@ -116,7 +116,7 @@ public class CoordMaterializeTransitionX
                 if (actionBean instanceof CoordinatorActionBean) {
                     CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
                     if (EventHandlerService.isEnabled()) {
-                        CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+                        CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
                     }
                     if (coordAction.getPushMissingDependencies() != null) {
                         // TODO: Delay in catchup mode?

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Fri Jun 21 02:05:56 2013
@@ -256,7 +256,7 @@ public class CoordPushDependencyCheckXCo
                     if (EventHandlerService.isEnabled()
                             && coordAction.getStatus() != CoordinatorAction.Status.READY) {
                         //since event is not to be generated unless action RUNNING via StartX
-                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
                     }
                 }
                 else {

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Fri Jun 21 02:05:56 2013
@@ -399,7 +399,6 @@ public class CoordRerunXCommand extends 
                 coordJob.resetPending();
             }
         }
-
         updateList.add(coordJob);
     }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Fri Jun 21 02:05:56 2013
@@ -181,9 +181,6 @@ public class CoordResumeXCommand extends
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
-            if (EventHandlerService.isEnabled()) {
-                generateEvents(coordJob);
-            }
         }
         catch (JPAExecutorException e) {
             throw new CommandException(e);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Fri Jun 21 02:05:56 2013
@@ -186,9 +186,6 @@ public class CoordSuspendXCommand extend
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
-            if (EventHandlerService.isEnabled()) {
-                generateEvents(coordJob);
-            }
         }
         catch (JPAExecutorException jex) {
             throw new CommandException(jex);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java Fri Jun 21 02:05:56 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.oozie.command.coord;
 
+import java.util.Date;
+
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.AppType;
@@ -53,7 +55,7 @@ public abstract class CoordinatorXComman
         super(name, type, priority, dryrun);
     }
 
-    public static void generateEvent(CoordinatorActionBean coordAction, String user, String appName) {
+    public static void generateEvent(CoordinatorActionBean coordAction, String user, String appName, Date startTime) {
         if (eventService.isSupportedApptype(AppType.COORDINATOR_ACTION.name())) {
             String missDep = coordAction.getMissingDependencies();
             if (missDep != null && missDep.length() > 0) {
@@ -66,7 +68,7 @@ public abstract class CoordinatorXComman
             String deps = missDep == null ? (pushMissDep == null ? null : pushMissDep) : (pushMissDep == null ? missDep
                     : missDep + CoordELFunctions.INSTANCE_SEPARATOR + pushMissDep);
             CoordinatorActionEvent event = new CoordinatorActionEvent(coordAction.getId(), coordAction.getJobId(),
-                    coordAction.getStatus(), user, appName, coordAction.getNominalTime(), coordAction.getCreatedTime(),
+                    coordAction.getStatus(), user, appName, coordAction.getNominalTime(), startTime,
                     deps);
             event.setErrorCode(coordAction.getErrorCode());
             event.setErrorMessage(coordAction.getErrorMessage());

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Fri Jun 21 02:05:56 2013
@@ -71,6 +71,8 @@ public class SignalXCommand extends Work
     private JPAService jpaService = null;
     private String jobId;
     private String actionId;
+    private String parentId;
+    private CoordinatorActionBean coordAction;
     private WorkflowJobBean wfJob;
     private WorkflowActionBean wfAction;
     private List<JsonBean> updateList = new ArrayList<JsonBean>();
@@ -80,13 +82,14 @@ public class SignalXCommand extends Work
     private String wfJobErrorMsg;
 
 
-    public SignalXCommand(String name, int priority, String jobId) {
+    public SignalXCommand(String name, int priority, String jobId, String parentId) {
         super(name, name, priority);
         this.jobId = ParamChecker.notEmpty(jobId, "jobId");
+        this.parentId = parentId;
     }
 
     public SignalXCommand(String jobId, String actionId) {
-        this("signal", 1, jobId);
+        this("signal", 1, jobId, null);
         this.actionId = ParamChecker.notEmpty(actionId, "actionId");
     }
 
@@ -109,6 +112,8 @@ public class SignalXCommand extends Work
                 LogUtils.setLogInfo(wfJob, logInfo);
                 if (actionId != null) {
                     this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
+                    coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(wfJob
+                            .getId()));
                     LogUtils.setLogInfo(wfAction, logInfo);
                 }
             }
@@ -326,11 +331,12 @@ public class SignalXCommand extends Work
             // call JPAExecutor to do the bulk writes
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
             if (generateEvent && EventHandlerService.isEnabled()) {
-                CoordinatorActionBean coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(wfJob
-                        .getId()));
                 if (coordAction != null) {
                     wfJob.setParentId(coordAction.getId());
                 }
+                else {
+                    wfJob.setParentId(parentId);
+                }
                 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
             }
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java Fri Jun 21 02:05:56 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,11 @@ import org.apache.oozie.util.InstrumentU
 public class StartXCommand extends SignalXCommand {
 
     public StartXCommand(String id) {
-        super("start", 1, id);
+        this(id, null);
+    }
+
+    public StartXCommand(String jobId, String parentId) {
+        super("start", 1, jobId, parentId);
         InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
     }
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java Fri Jun 21 02:05:56 2013
@@ -18,7 +18,7 @@
 package org.apache.oozie.event;
 
 import java.io.Serializable;
-import java.util.Set;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.event.Event;
@@ -54,7 +54,7 @@ public interface EventQueue {
      * Fetch events from queue in batch
      * @return events set
      */
-    public Set<Event> pollBatch();
+    public List<Event> pollBatch();
 
     /**
     * Fetch single event from queue

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java Fri Jun 21 02:05:56 2013
@@ -18,8 +18,8 @@
 
 package org.apache.oozie.event;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -74,9 +74,9 @@ public class MemoryEventQueue implements
     }
 
     @Override
-    public Set<Event> pollBatch() {
+    public List<Event> pollBatch() {
         // batch drain
-        Set<Event> eventBatch = new HashSet<Event>();
+        List<Event> eventBatch = new ArrayList<Event>();
         for (int i = 0; i < batchSize; i++) {
             EventQueueElement polled = eventQueue.poll();
             if (polled != null) {

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java?rev=1495272&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java Fri Jun 21 02:05:56 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * JPAExecutor to get attributes of CoordinatorActionBean required by SLAService on restart
+ */
+public class CoordActionGetForSLAJPAExecutor implements JPAExecutor<CoordinatorActionBean> {
+
+    private String coordActionId;
+
+    public CoordActionGetForSLAJPAExecutor(String coordActionId) {
+        ParamChecker.notNull(coordActionId, "coordActionId");
+        this.coordActionId = coordActionId;
+    }
+
+    @Override
+    public String getName() {
+        return "CoordActionGetForSLAJPAExecutor";
+    }
+
+    @Override
+    public CoordinatorActionBean execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("GET_COORD_ACTION_FOR_SLA");
+            q.setParameter("id", coordActionId);
+            Object[] obj = (Object[]) q.getSingleResult();
+            CoordinatorActionBean caBean = getBeanForRunningCoordAction(obj);
+            return caBean;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+
+    }
+
+    private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
+        CoordinatorActionBean bean = new CoordinatorActionBean();
+        if (arr[0] != null) {
+            bean.setId((String) arr[0]);
+        }
+        if (arr[1] != null) {
+            bean.setJobId((String) arr[1]);
+        }
+        if (arr[2] != null) {
+            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
+        }
+        if (arr[3] != null) {
+            bean.setExternalId((String) arr[3]);
+        }
+        if (arr[4] != null) {
+            bean.setLastModifiedTime(DateUtils.toDate((Timestamp)arr[4]));
+        }
+        return bean;
+    }
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java?rev=1495272&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java Fri Jun 21 02:05:56 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Retrieve the workflow action bean for sla service
+ */
+public class WorkflowActionGetForSLAJPAExecutor implements JPAExecutor<WorkflowActionBean> {
+
+    private String wfActionId;
+
+    public WorkflowActionGetForSLAJPAExecutor(String wfActionId) {
+        ParamChecker.notNull(wfActionId, "wfActionId");
+        this.wfActionId = wfActionId;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowActionGetForSLAJPAExecutor";
+    }
+
+    @Override
+    public WorkflowActionBean execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("GET_ACTION_FOR_SLA");
+            q.setParameter("id", wfActionId);
+            Object[] obj = (Object[]) q.getSingleResult();
+            return getBeanFromArray(obj);
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+    private WorkflowActionBean getBeanFromArray(Object[] arr) {
+        WorkflowActionBean wab = new WorkflowActionBean();
+        if (arr[0] != null) {
+            wab.setId((String) arr[0]);
+        }
+        if (arr[1] != null) {
+            wab.setStatus(WorkflowAction.Status.valueOf((String) arr[1]));
+        }
+        if (arr[2] != null) {
+            wab.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+        }
+        if (arr[3] != null) {
+            wab.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+        }
+        return wab;
+    }
+}
\ No newline at end of file

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java?rev=1495272&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java Fri Jun 21 02:05:56 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Retrieve the workflow job bean for sla service
+ */
+public class WorkflowJobGetForSLAJPAExecutor implements JPAExecutor<WorkflowJobBean> {
+
+    private String wfJobId;
+
+    public WorkflowJobGetForSLAJPAExecutor(String wfJobId) {
+        ParamChecker.notNull(wfJobId, "wfJobId");
+        this.wfJobId = wfJobId;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowJobGetForSLAJPAExecutor";
+    }
+
+    @Override
+    public WorkflowJobBean execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("GET_WORKFLOW_FOR_SLA");
+            q.setParameter("id", wfJobId);
+            Object[] obj = (Object[]) q.getSingleResult();
+            return getBeanFromArray(obj);
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+    private WorkflowJobBean getBeanFromArray(Object[] arr) {
+        WorkflowJobBean wjb = new WorkflowJobBean();
+        if (arr[0] != null) {
+            wjb.setId((String) arr[0]);
+        }
+        if (arr[1] != null) {
+            wjb.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
+        }
+        if (arr[2] != null) {
+            wjb.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+        }
+        if (arr[3] != null) {
+            wjb.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+        }
+        return wjb;
+    }
+}
\ No newline at end of file

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java Fri Jun 21 02:05:56 2013
@@ -84,7 +84,8 @@ public class JMSJobEventListener extends
                     textMessage.setStringProperty(property.getKey(), property.getValue());
                 }
                 textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
-                LOG.trace("Event related JMS message [{0}]", textMessage.toString());
+                LOG.trace("Event related JMS text body [{0}]", textMessage.getText());
+                LOG.trace("Event related JMS entire message [{0}]", textMessage.toString());
                 MessageProducer producer = jmsContext.createProducer(session, topicName);
                 producer.setDeliveryMode(jmsDeliveryMode);
                 producer.setTimeToLive(jmsExpirationDate);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java Fri Jun 21 02:05:56 2013
@@ -224,7 +224,7 @@ public class EventHandlerService impleme
             }
             try {
                 if (!eventQueue.isEmpty()) {
-                    Set<Event> work = eventQueue.pollBatch();
+                    List<Event> work = eventQueue.pollBatch();
                     for (Event event : work) {
                         MessageType msgType = event.getMsgType();
                         List<?> listeners = listenerMap.get(msgType);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java Fri Jun 21 02:05:56 2013
@@ -266,7 +266,7 @@ public class SLACalcStatus extends SLAEv
 
     @Override
     public String toString() {
-        return "ID: " + getId() + " SLAStatus: " + slaStatus + " EventProcessed: "+eventProcessed;
+        return "ID: " + getId() + " SLAStatus: " + slaStatus + " EventStatus: "+eventStatus + "AppType " + getAppType();
     }
 
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Fri Jun 21 02:05:56 2013
@@ -30,11 +30,22 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.JobEvent;
 import org.apache.oozie.client.event.SLAEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
+
 import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
@@ -48,6 +59,7 @@ import org.apache.oozie.service.Services
 import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.util.XLog;
 
+
 /**
  * Implementation class for SLACalculator that calculates SLA related to
  * start/end/duration of jobs using a memory-based map
@@ -77,32 +89,159 @@ public class SLACalculatorMemory impleme
     }
 
     private void loadOnRestart() {
+        boolean isJobModified = false;
         try {
             List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
                     modifiedAfter));
             for (SLASummaryBean summaryBean : summaryBeans) {
                 String jobId = summaryBean.getJobId();
-                if (summaryBean.getEventProcessed() == 7) {
-                    historySet.add(jobId);
+                try {
+                    switch (summaryBean.getAppType()) {
+                        case COORDINATOR_ACTION:
+                            isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
+                            break;
+                        case WORKFLOW_ACTION:
+                            isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
+                            break;
+                        case WORKFLOW_JOB:
+                            isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
+                            break;
+                        default:
+                            break;
+                    }
+                    if (isJobModified) {
+                        jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean));
+                    }
                 }
-                else {
-                    try {
+                catch (Exception e) {
+                    XLog.getLog(SLAService.class).warn("Failed to load records for " + jobId, e);
+                }
+                try {
+                    if (summaryBean.getEventProcessed() == 7) {
+                        historySet.add(jobId);
+                    }
+                    else if (summaryBean.getEventProcessed() <= 7) {
                         SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
                                 jobId));
                         SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
                         slaMap.put(jobId, slaCalcStatus);
                     }
-                    catch (JPAExecutorException e) {
-                        XLog.getLog(SLAService.class).warn("Cannot retrieve registration record for " + jobId, e);
-                    }
                 }
+                catch (Exception e) {
+                    XLog.getLog(SLAService.class).warn("Failed to fetch/update records for " + jobId, e);
+                }
+
             }
+
         }
-        catch (JPAExecutorException e) {
+        catch (Exception e) {
             XLog.getLog(SLAService.class).warn("Failed to retrieve SLASummary records on restart", e);
         }
     }
 
+    private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
+            throws JPAExecutorException {
+        boolean isJobModified = false;
+        CoordinatorActionBean coordAction = null;
+        coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
+        if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
+            XLog.getLog(SLAService.class).trace(
+                    "Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
+                            + summaryBean.getJobStatus());
+            isJobModified = true;
+            summaryBean.setJobStatus(coordAction.getStatusStr());
+            if (coordAction.isTerminalStatus()) {
+                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
+                        .getExternalId()));
+                setSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
+                        coordAction.getStatusStr());
+            }
+            else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
+                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
+                        .getExternalId()));
+                setSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
+            }
+        }
+        return isJobModified;
+    }
+
+    private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
+            throws JPAExecutorException {
+        boolean isJobModified = false;
+        WorkflowActionBean wfAction = null;
+        wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
+        if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
+            XLog.getLog(SLAService.class).trace(
+                    "Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
+                            + summaryBean.getJobStatus());
+            isJobModified = true;
+            summaryBean.setJobStatus(wfAction.getStatusStr());
+            if (wfAction.inTerminalState()) {
+                setSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
+            }
+            else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
+                setSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
+            }
+        }
+        return isJobModified;
+    }
+
+    private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
+            throws JPAExecutorException {
+        boolean isJobModified = false;
+        WorkflowJobBean wfJob = null;
+        wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
+        if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
+            XLog.getLog(SLAService.class).trace(
+                    "Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
+                            + summaryBean.getJobStatus());
+            isJobModified = true;
+            summaryBean.setJobStatus(wfJob.getStatusStr());
+            if (wfJob.inTerminalState()) {
+                setSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
+            }
+            else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
+                setSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
+            }
+        }
+        return isJobModified;
+    }
+
+    private void setSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
+        byte eventProc = summaryBean.getEventProcessed();
+        summaryBean.setEventProcessed(8);
+        summaryBean.setActualStart(startTime);
+        summaryBean.setActualEnd(endTime);
+        long actualDuration = endTime.getTime() - startTime.getTime();
+        summaryBean.setActualDuration(actualDuration);
+        if (eventProc < 4) {
+            if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
+                    || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
+                if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
+                    summaryBean.setSLAStatus(SLAStatus.MET);
+                }
+                else {
+                    summaryBean.setSLAStatus(SLAStatus.MISS);
+                }
+            }
+            else {
+                summaryBean.setSLAStatus(SLAStatus.MISS);
+            }
+        }
+
+    }
+
+    private void setSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
+        if (((eventProc & 1) == 0)) {
+            eventProc += 1;
+            summaryBean.setEventProcessed(eventProc);
+        }
+        if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
+            summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
+        }
+        summaryBean.setActualStart(startTime);
+    }
+
     @Override
     public int size() {
         return slaMap.size();
@@ -161,7 +300,11 @@ public class SLACalculatorMemory impleme
 
             }
             if (((eventProc >> 1) & 1) == 0) { // check if second bit (duration-processed) is unset
-                if (slaCalc.getActualStart() != null) {
+                if (reg.getExpectedDuration() == -1) {
+                    eventProc += 2;
+                    change = true;
+                }
+                else if (slaCalc.getActualStart() != null) {
                     if (reg.getExpectedDuration() + jobEventLatency < Calendar.getInstance(TimeZone.getTimeZone("UTC"))
                             .getTimeInMillis() - slaCalc.getActualStart().getTime()) {
                         slaCalc.setEventStatus(EventStatus.DURATION_MISS);
@@ -226,6 +369,7 @@ public class SLACalculatorMemory impleme
             if (slaMap.size() < capacity) {
                 SLACalcStatus slaCalc = new SLACalcStatus(reg);
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
+                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
                 slaMap.put(jobId, slaCalc);
                 List<JsonBean> insertList = new ArrayList<JsonBean>();
                 insertList.add(reg);
@@ -246,6 +390,24 @@ public class SLACalculatorMemory impleme
         return false;
     }
 
+    private String getJobStatus(AppType appType) {
+        String status = null;
+        switch (appType) {
+            case COORDINATOR_ACTION:
+                status = CoordinatorAction.Status.WAITING.name();
+                break;
+            case WORKFLOW_ACTION:
+                status = WorkflowAction.Status.PREP.name();
+                break;
+            case WORKFLOW_JOB:
+                status = WorkflowJob.Status.PREP.name();
+                break;
+            default:
+                break;
+        }
+        return status;
+    }
+
     /**
      * Update job into the map for SLA tracking
      */
@@ -255,6 +417,7 @@ public class SLACalculatorMemory impleme
             if (slaMap.size() < capacity) {
                 SLACalcStatus slaCalc = new SLACalcStatus(reg);
                 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
+                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
                 slaMap.put(jobId, slaCalc);
                 List<JsonBean> updateList = new ArrayList<JsonBean>();
                 updateList.add(reg);
@@ -299,8 +462,8 @@ public class SLACalculatorMemory impleme
                         slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
                         break;
                     default:
-                        XLog.getLog(SLAService.class).debug("Unknown Job Status [{0}]", jobEventStatus);
-                        return false;
+                        XLog.getLog(SLAService.class).debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
+                        slaInfo = getSLASummaryBean(slaCalc);
                 }
 
                 if (slaCalc.getEventProcessed() == 7) {
@@ -382,15 +545,16 @@ public class SLACalculatorMemory impleme
         //check event proc
         byte eventProc = slaCalc.getEventProcessed();
         if (((eventProc >> 1) & 1) == 0) {
-            if (actualDuration > expectedDuration) {
+            if (expectedDuration != -1 && actualDuration > expectedDuration) {
                 slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
             }
-            else {
+            else if (expectedDuration != -1 && actualDuration <= expectedDuration) {
                 slaCalc.setEventStatus(EventStatus.DURATION_MET);
+                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
             }
             eventProc += 2;
             slaCalc.setEventProcessed(eventProc);
-            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
         }
 
         if (eventProc < 4) {
@@ -421,6 +585,15 @@ public class SLACalculatorMemory impleme
      * @throws JPAExecutorException
      */
     private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
+        if (actualStart == null) {
+            // job failed before starting
+            slaCalc.setEventProcessed(7);
+            slaCalc.setActualEnd(actualEnd);
+            slaCalc.setEventStatus(EventStatus.END_MISS);
+            slaCalc.setSLAStatus(SLAStatus.MISS);
+            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+            return getSLASummaryBean(slaCalc);
+        }
         slaCalc.setActualStart(actualStart);
         slaCalc.setActualEnd(actualEnd);
         SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
@@ -430,16 +603,18 @@ public class SLACalculatorMemory impleme
 
         byte eventProc = slaCalc.getEventProcessed();
         if (((eventProc >> 1) & 1) == 0) {
-            if (actualDuration > expectedDuration) {
+            if (expectedDuration != -1 && actualDuration > expectedDuration) {
                 slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
             }
-            else {
+            else if (expectedDuration != -1 && actualDuration <= expectedDuration) {
                 slaCalc.setEventStatus(EventStatus.DURATION_MET);
+                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
             }
             eventProc += 2;
             slaCalc.setEventProcessed(eventProc);
-            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
         }
+
         if (eventProc < 4) {
             slaCalc.setEventStatus(EventStatus.END_MISS);
             slaCalc.setSLAStatus(SLAStatus.MISS);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java Fri Jun 21 02:05:56 2013
@@ -83,7 +83,7 @@ public class SLARegistrationBean impleme
 
     @Basic
     @Column(name = "expected_duration")
-    private long expectedDuration = 0;
+    private long expectedDuration = -1;
 
     @Basic
     @Column(name = "user_name")

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java Fri Jun 21 02:05:56 2013
@@ -94,7 +94,7 @@ public class SLASummaryBean implements J
 
     @Basic
     @Column(name = "expected_duration")
-    private long expectedDuration;
+    private long expectedDuration = -1;
 
     @Basic
     @Column(name = "actual_start")

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java Fri Jun 21 02:05:56 2013
@@ -68,17 +68,14 @@ public class SLAJobEventListener extends
     }
 
     private void sendEventToSLAService(JobEvent event, String status) {
-        if (!status.equals(CoordinatorAction.Status.WAITING.name())
-                && !status.equals(CoordinatorAction.Status.SUSPENDED.name())) {
-            Date startTime = event.getStartTime();
-            Date endTime = event.getEndTime();
-            try {
-                Services.get().get(SLAService.class)
-                        .addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
-            }
-            catch (ServiceException se) {
-                XLog.getLog(SLAService.class).error("Exception happened while sending Job-Status event for SLA", se);
-            }
+        Date startTime = event.getStartTime();
+        Date endTime = event.getEndTime();
+        try {
+            Services.get().get(SLAService.class)
+                    .addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
+        }
+        catch (ServiceException se) {
+            XLog.getLog(SLAService.class).error("Exception happened while sending Job-Status event for SLA", se);
         }
     }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Fri Jun 21 02:05:56 2013
@@ -24,6 +24,7 @@ import java.io.Writer;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +41,7 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event;
 import org.apache.oozie.client.event.JobEvent;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.client.rest.RestConstants;
@@ -215,7 +217,7 @@ public class TestEventGeneration extends
         ehs.setAppTypes(new HashSet<String>(Arrays.asList("coordinator_action")));
         assertEquals(queue.size(), 0);
         Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
-        Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:14Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:01Z");
         CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
                 false, 0);
         modifyCoordForRunning(coord);
@@ -234,7 +236,7 @@ public class TestEventGeneration extends
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
         assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
-        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertNull(event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
         assertEquals(0, queue.size());
@@ -251,13 +253,15 @@ public class TestEventGeneration extends
             }
         });
 
+        action = jpaService.execute(coordGetCmd);
         event = (JobEvent) queue.poll();
         assertEquals(EventStatus.STARTED, event.getEventStatus());
         assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
         assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
-        assertEquals(action.getCreatedTime(), event.getStartTime());
+        WorkflowJobBean wjb = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
+        assertEquals(wjb.getStartTime(), event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
 
@@ -266,17 +270,19 @@ public class TestEventGeneration extends
         WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
         wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
         jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+        action.setStatus(CoordinatorAction.Status.RUNNING);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
         new CoordActionCheckXCommand(action.getId(), 0).call();
-        Thread.sleep(300);
         action = jpaService.execute(coordGetCmd);
         assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
-        event = (JobEvent) queue.poll();
+        List<Event> list =  queue.pollBatch();
+        event = (JobEvent)list.get(list.size()-1);
         assertEquals(EventStatus.SUCCESS, event.getEventStatus());
         assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
         assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
-        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertEquals(wfJob.getStartTime(), event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
 
@@ -294,7 +300,7 @@ public class TestEventGeneration extends
         assertEquals(action.getId(), event.getId());
         assertEquals(action.getJobId(), event.getParentId());
         assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
-        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertEquals(wfJob.getStartTime(), event.getStartTime());
         assertEquals(coord.getUser(), event.getUser());
         assertEquals(coord.getAppName(), event.getAppName());
 
@@ -303,18 +309,25 @@ public class TestEventGeneration extends
         jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
         action.setStatus(CoordinatorAction.Status.SUSPENDED);
         jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
+        WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
+        ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
+        wfJob.setWorkflowInstance(wfInstance);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
         new CoordResumeXCommand(coord.getId()).call();
+        Thread.sleep(5000);
         CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
         assertEquals(EventStatus.STARTED, cevent.getEventStatus());
         assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
         assertEquals(action.getId(), cevent.getId());
         assertEquals(action.getJobId(), cevent.getParentId());
         assertEquals(action.getNominalTime(), cevent.getNominalTime());
-        assertEquals(action.getCreatedTime(), cevent.getStartTime());
+        assertEquals(wfJob.getStartTime(), cevent.getStartTime());
 
         // Action going to WAITING on Coord Rerun
         action.setStatus(CoordinatorAction.Status.SUCCEEDED);
         jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+        queue.clear();
         new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_RERUN_ACTION, "1", false, true)
                 .call();
         waitFor(3 * 100, new Predicate() {
@@ -329,7 +342,7 @@ public class TestEventGeneration extends
         assertEquals(action.getId(), cevent.getId());
         assertEquals(action.getJobId(), cevent.getParentId());
         assertEquals(action.getNominalTime(), cevent.getNominalTime());
-        assertEquals(action.getCreatedTime(), event.getStartTime());
+        assertEquals(wfJob.getStartTime(), event.getStartTime());
         assertNotNull(cevent.getMissingDeps());
 
     }
@@ -426,12 +439,15 @@ public class TestEventGeneration extends
         final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
         final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
                 CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+        WorkflowJobBean wjb = new WorkflowJobBean();
+        wjb.setId(action.getExternalId());
         JPAService jpaService = services.get(JPAService.class);
+        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
 
         CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
             @Override
             protected Void execute() {
-                CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName());
+                CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName(), null);
                 return null;
             }
         };