You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/04/18 01:55:21 UTC

svn commit: r1469108 [1/3] - in /oozie/trunk: ./ client/ client/src/main/java/org/apache/oozie/cli/ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/event/jms/ client/src/main/java/org/apache/oozie/client/event...

Author: virag
Date: Wed Apr 17 23:55:20 2013
New Revision: 1469108

URL: http://svn.apache.org/r1469108
Log:
Combined patch for OOZIE-1234 and OOZIE-1235

Added:
    oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java
    oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/
    oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForUserJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
Modified:
    oozie/trunk/client/pom.xml
    oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
    oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
    oozie/trunk/core/pom.xml
    oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
    oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
    oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
    oozie/trunk/pom.xml
    oozie/trunk/release-log.txt

Modified: oozie/trunk/client/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/client/pom.xml?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/pom.xml (original)
+++ oozie/trunk/client/pom.xml Wed Apr 17 23:55:20 2013
@@ -49,6 +49,24 @@
         </dependency>
 
         <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-auth</artifactId>
             <scope>compile</scope>

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Wed Apr 17 23:55:20 2013
@@ -56,6 +56,7 @@ import org.apache.oozie.client.BulkRespo
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.JMSConnectionInfo;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.WorkflowAction;
@@ -110,6 +111,7 @@ public class OozieCLI {
     public static final String ACTION_OPTION = "action";
     public static final String DEFINITION_OPTION = "definition";
     public static final String CONFIG_CONTENT_OPTION = "configcontent";
+    public static final String JMS_INFO_OPTION = "jmsinfo";
 
     public static final String DO_AS_OPTION = "doas";
 
@@ -253,6 +255,7 @@ public class OozieCLI {
         Option changeValue = new Option(CHANGE_VALUE_OPTION, true,
                 "new endtime/concurrency/pausetime value for changing a coordinator job");
         Option info = new Option(INFO_OPTION, true, "info of a job");
+        Option jmsInfo = new Option (JMS_INFO_OPTION, true, "JMS Topic name and JNDI connection string for a job");
         Option offset = new Option(OFFSET_OPTION, true, "job info offset of actions (default '1', requires -info)");
         Option len = new Option(LEN_OPTION, true, "number of actions (default TOTAL ACTIONS, requires -info)");
         Option filter = new Option(FILTER_OPTION, true,
@@ -289,6 +292,7 @@ public class OozieCLI {
         actions.addOption(kill);
         actions.addOption(change);
         actions.addOption(info);
+        actions.addOption(jmsInfo);
         actions.addOption(rerun);
         actions.addOption(log);
         actions.addOption(definition);
@@ -747,6 +751,9 @@ public class OozieCLI {
         }
 
         try {
+            if (options.contains(JMS_INFO_OPTION)) {
+                printJMSInfo(wc.getJMSConnectionInfo(commandLine.getOptionValue(JMS_INFO_OPTION)));
+            }
             if (options.contains(SUBMIT_OPTION)) {
                 System.out.println(JOB_ID_PREFIX + wc.submit(getConfiguration(wc, commandLine)));
             }
@@ -1077,6 +1084,11 @@ public class OozieCLI {
         }
     }
 
+    private void printJMSInfo (JMSConnectionInfo jmsInfo) {
+        System.out.println("Topic Name         : " + jmsInfo.getTopicName());
+        System.out.println("JNDI Properties    : " + jmsInfo.getJNDIProperties());
+    }
+
     private void printWorkflowAction(WorkflowAction action, String timeZoneId, boolean verbose) {
         System.out.println("ID : " + maskIfNull(action.getId()));
 

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client;
+
+import java.util.Properties;
+
+/**
+ * JMS connection related information
+ *
+ */
+public interface JMSConnectionInfo {
+
+    /**
+     * Retrieve the JMS topic name
+     * @return the topic name
+     */
+    String getTopicName();
+
+    /**
+     * Retrieve the JNDI properties for establishing connection to JMS server
+     * @return the JNDI properties
+     */
+    Properties getJNDIProperties();
+
+}

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed Apr 17 23:55:20 2013
@@ -667,6 +667,27 @@ public class OozieClient {
         }
     }
 
+
+    private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
+
+        JMSInfo(String jobId) {
+            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
+                    RestConstants.JOB_SHOW_JMS_INFO));
+        }
+
+        protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
+            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+                Reader reader = new InputStreamReader(conn.getInputStream());
+                JSONObject json = (JSONObject) JSONValue.parse(reader);
+                return JsonToBean.createJMSConnectionInfo(json);
+            }
+            else {
+                handleError(conn);
+            }
+            return null;
+        }
+    }
+
     private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
         WorkflowActionInfo(String actionId) {
             super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
@@ -699,6 +720,16 @@ public class OozieClient {
     }
 
     /**
+     * Get the JMS Connection info
+     * @param jobId
+     * @return JMSConnectionInfo object
+     * @throws OozieClientException
+     */
+    public JMSConnectionInfo getJMSConnectionInfo(String jobId) throws OozieClientException {
+        return new JMSInfo(jobId).call();
+    }
+
+    /**
      * Get the info of a workflow job and subset actions.
      *
      * @param jobId job Id.

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+/**
+ *
+ * Class holding constants used in JMS selectors
+ */
+public final class JMSHeaderConstants {
+    // JMS Application specific properties for selectors
+    public static final String EVENT_STATUS = "EVENT_STATUS";
+    public static final String APP_NAME = "APP_NAME";
+    public static final String USER = "USER";
+    public static final String MESSAGE_TYPE = "MESSAGE_TYPE";
+    public static final String APP_TYPE = "APP_TYPE";
+    // JMS Header property
+    public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.oozie.client.event.message.EventMessage;
+
+/**
+ * Client utility to convert JMS message to EventMessage object
+ */
+public class JMSMessagingUtils {
+
+    private static final String DESERIALIZER_PROP = "oozie.msg.deserializer.";
+
+    /**
+     * Constructs the EventMessage object from JMS message
+     *
+     * @param msg the JMS message
+     * @return the EventMessage
+     * @throws IOException
+     * @throws JMSException
+     */
+    public static EventMessage getEventMessage(Message msg) throws IOException, JMSException {
+        if (msg == null) {
+            throw new IllegalArgumentException("Could not extract EventMessage as JMS message is null");
+        }
+        TextMessage textMessage = (TextMessage) msg;
+        String msgFormat = msg.getStringProperty(JMSHeaderConstants.MESSAGE_FORMAT);
+
+        Properties jmsDeserializerInfo = new Properties();
+        InputStream is = JMSMessagingUtils.class.getClassLoader().getResourceAsStream("oozie_client.properties");
+
+        if (is == null) {
+            System.out.println("Using default deserializer");
+            return new JSONMessageDeserializer().getEventMessage(textMessage);
+        }
+        else {
+            jmsDeserializerInfo.load(is);
+            MessageDeserializer deserializer = getDeserializer((String) jmsDeserializerInfo.get(DESERIALIZER_PROP
+                    + msgFormat));
+            return deserializer.getEventMessage(textMessage);
+        }
+    }
+
+    private static MessageDeserializer getDeserializer(String deserializerString) {
+        if (deserializerString == null) {
+            return new JSONMessageDeserializer();
+        }
+        else {
+            try {
+                MessageDeserializer msgDeserializer = (MessageDeserializer) Class.forName(deserializerString)
+                        .newInstance();
+                return msgDeserializer;
+            }
+            catch (Exception cnfe) {
+                throw new IllegalArgumentException("Could not access class " + deserializerString, cnfe);
+            }
+
+        }
+    }
+}
\ No newline at end of file

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+
+/**
+ * Message deserializer to convert from JSON to java object
+ */
+public class JSONMessageDeserializer extends MessageDeserializer {
+
+    @Override
+    public <T> T getDeserializedObject(String messageBody, Class<T> clazz) {
+        try {
+            return mapper.readValue(messageBody, clazz);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not deserialize the JMS message using "
+                    + clazz.getCanonicalName(), exception);
+        }
+    }
+
+    @Override
+    public WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage workflowJobMsg, Message message)
+            throws JMSException {
+        workflowJobMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
+        workflowJobMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));
+        workflowJobMsg.setEventStatus(EventStatus.valueOf(message.getStringProperty(JMSHeaderConstants.EVENT_STATUS)));
+        workflowJobMsg.setAppName(message.getStringProperty(JMSHeaderConstants.APP_NAME));
+        workflowJobMsg.setUser(message.getStringProperty(JMSHeaderConstants.USER));
+        return workflowJobMsg;
+
+    }
+
+    @Override
+    public CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMsg, Message message)
+            throws JMSException {
+        coordActionMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
+        coordActionMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));
+        coordActionMsg.setEventStatus(EventStatus.valueOf(message.getStringProperty(JMSHeaderConstants.EVENT_STATUS)));
+        coordActionMsg.setAppName(message.getStringProperty(JMSHeaderConstants.APP_NAME));
+        coordActionMsg.setUser(message.getStringProperty(JMSHeaderConstants.USER));
+        return coordActionMsg;
+    }
+
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.EventMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.apache.oozie.client.event.Event;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.DeserializationConfig;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+
+/**
+ * Class to deserialize the jms message to java object
+ */
+public abstract class MessageDeserializer {
+    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+    static {
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    /**
+     * Constructs the event message from JMS message
+     *
+     * @param message the JMS message
+     * @return EventMessage
+     * @throws JMSException
+     */
+    public EventMessage getEventMessage(Message message) throws JMSException {
+        String appTypeString = message.getStringProperty(JMSHeaderConstants.APP_TYPE);
+        String messageBody = ((TextMessage) message).getText();
+
+        if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
+            throw new IllegalArgumentException("Could not extract OozieEventMessage. "
+                    + "AppType and/or MessageBody is null/empty." + "Apptype is " + appTypeString + " MessageBody is "
+                    + messageBody);
+        }
+        switch (Event.AppType.valueOf(appTypeString)) {
+            case WORKFLOW_JOB:
+                WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
+                return setPropertiesForObject(wfJobMsg, message);
+            case COORDINATOR_ACTION:
+                CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
+                        CoordinatorActionMessage.class);
+                return setPropertiesForObject(caActionMsg, message);
+            default:
+                throw new UnsupportedOperationException("Conversion of " + appTypeString
+                        + " to Event message is not supported");
+        }
+
+    }
+
+    protected abstract <T> T getDeserializedObject(String s, Class<T> clazz);
+
+    /**
+     * Set the JMS selector properties for the workflow job message object
+     *
+     * @param wfJobMessage the workflow job message
+     * @param message the JMS message
+     * @return WorkflowJobMessage
+     * @throws JMSException
+     */
+    public abstract WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage wfJobMessage, Message message)
+            throws JMSException;
+
+    /**
+     * Set the JMS selector properties for coordinator action message
+     *
+     * @param coordActionMessage the coordinator action message
+     * @param message the JMS message
+     * @return CoordinatorActionMessage
+     * @throws JMSException
+     */
+    public abstract CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMessage,
+            Message message) throws JMSException;
+
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import java.util.Date;
+
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.Event.AppType;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * Class holding attributes related to Coordinator action message
+ *
+ */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class CoordinatorActionMessage extends JobMessage {
+
+    @JsonProperty
+    private CoordinatorAction.Status status;
+    @JsonProperty
+    private Date nominalTime;
+    @JsonProperty
+    private String missingDependency;
+    @JsonProperty
+    private String errorCode;
+    @JsonProperty
+    private String errorMessage;
+
+    /**
+     * Default constructor
+     */
+    public CoordinatorActionMessage() {
+        // Default constructor for jackson
+    }
+
+    /**
+     * Constructs the coordinator action message
+     * @param eventStatus the event status
+     * @param coordinatorActionId the coord action id
+     * @param coordinatorJobId the parent job id
+     * @param startTime the created time of coord action
+     * @param endTime the end time of coord action
+     * @param nominalTime the nominal time of coord action
+     * @param status the status of coord action
+     * @param user the user of coordinator
+     * @param appName the app name of coordinator
+     * @param missingDependency the action's first missing dependency
+     * @param errorCode the action's error code
+     * @param errorMessage the action's error message
+     */
+    public CoordinatorActionMessage(EventStatus eventStatus, String coordinatorActionId,
+            String coordinatorJobId, Date startTime, Date endTime, Date nominalTime, CoordinatorAction.Status status,
+            String user, String appName, String missingDependency, String errorCode, String errorMessage) {
+        super(eventStatus, AppType.COORDINATOR_ACTION, coordinatorActionId, coordinatorJobId, startTime,
+                endTime, user, appName);
+        this.status = status;
+        this.nominalTime = nominalTime;
+        this.missingDependency = missingDependency;
+        this.errorCode = errorCode;
+        this.errorMessage = errorMessage;
+
+    }
+
+    /**
+     * Set the status of coordinator action
+     * @param status
+     */
+    public void setStatus(CoordinatorAction.Status status) {
+        this.status = status;
+    }
+
+    /**
+     * Get the status of coord action
+     * @return the CoordinatorAction status
+     */
+    public CoordinatorAction.Status getStatus() {
+        return status;
+    }
+
+    /**
+     * Set the nominal time
+     * @param nominalTime
+     */
+    public void setNominalTime(Date nominalTime) {
+        this.nominalTime = nominalTime;
+    }
+
+    /**
+     * Get the nominal time
+     * @return the nominal time
+     */
+    public Date getNominalTime() {
+        return nominalTime;
+    }
+
+    /**
+     * Set the error code
+     * @param errorCode
+     */
+    public void setErrorCode(String errorCode) {
+        this.errorCode = errorCode;
+    }
+
+    /**
+     * Get the error code
+     * @return the errorCode
+     */
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    /**
+     * Set the error message
+     * @param errorMessage
+     */
+    public void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    /**
+     * Get the error message
+     * @return the errorMessage
+     */
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    /**
+     * Set the missing dependency
+     * @param missingDependency
+     */
+    public void setMissingDependency(String missingDependency) {
+        this.missingDependency = missingDependency;
+    }
+
+    /**
+     * Get the missing dependency
+     * @return the missing Dependency
+     */
+    public String getMissingDependency() {
+        return missingDependency;
+    }
+
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.Event.AppType;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * Base class which holds attributes for event message
+ *
+ */
+public abstract class EventMessage {
+
+    private Event.AppType appType;
+    private Event.MessageType messageType;
+
+    /**
+     * Default constructor for constructing event
+     */
+    public EventMessage() {
+        //Required for jackson
+    }
+
+    /**
+     * Constructs the event message using message type and app type
+     * @param messageType the message type
+     * @param appType the app type
+     */
+    protected EventMessage(MessageType messageType, AppType appType) {
+        this.messageType = messageType;
+        this.appType = appType;
+    }
+
+    /**
+     * Sets the appType for a event
+     * @param appType
+     */
+    public void setAppType(AppType appType) {
+        this.appType = appType;
+    }
+
+    /**
+     * Returns the appType for a event
+     * @return the AppType
+     */
+    @JsonIgnore
+    public AppType getAppType() {
+        return appType;
+    }
+
+    /**
+     * Sets the message type for a event
+     * @param messageType
+     */
+    public void setMessageType(MessageType messageType) {
+        this.messageType = messageType;
+    }
+
+    /**
+     * Returns the message type for a event
+     * @return the MessageType
+     */
+    @JsonIgnore
+    public MessageType getMessageType() {
+        return messageType;
+    }
+
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.client.event.jms.JMSHeaderConstants;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+/**
+ * Class holding attributes related to a job message
+ *
+ */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class JobMessage extends EventMessage {
+
+    @JsonProperty
+    private String id;
+    @JsonProperty
+    private String parentId;
+    @JsonProperty
+    private Date startTime;
+    @JsonProperty
+    private Date endTime;
+
+    private JobEvent.EventStatus eventStatus;
+    private String appName;
+    private Map<String, String> jmsMessageProperties;
+    private String user;
+
+    /**
+     * Default constructor
+     */
+    public JobMessage() {
+        // Default constructor for jackson
+    }
+
+    /**
+     * Constructs a job message
+     *
+     * @param eventStatus the event status
+     * @param appType the appType for event
+     * @param id the id of job
+     * @param parentId the parent id of job
+     * @param startTime the start time of job
+     * @param endTime the end time of job
+     * @param user user of the job
+     * @param appName appName for job
+     */
+    public JobMessage(JobEvent.EventStatus eventStatus, AppType appType, String id, String parentId, Date startTime,
+            Date endTime, String user, String appName) {
+        super(MessageType.JOB, appType);
+        this.eventStatus = eventStatus;
+        this.id = id;
+        this.parentId = parentId;
+        this.startTime = startTime;
+        this.appName = appName;
+        this.user = user;
+        this.endTime = endTime;
+
+        jmsMessageProperties = new HashMap<String, String>();
+        jmsMessageProperties.put(JMSHeaderConstants.APP_TYPE, appType.toString());
+        jmsMessageProperties.put(JMSHeaderConstants.MESSAGE_TYPE, MessageType.JOB.toString());
+        jmsMessageProperties.put(JMSHeaderConstants.EVENT_STATUS, eventStatus.toString());
+        jmsMessageProperties.put(JMSHeaderConstants.APP_NAME, appName);
+        jmsMessageProperties.put(JMSHeaderConstants.USER, user);
+    }
+
+    /**
+     * Sets the Job id for message
+     *
+     * @param id the job id
+     */
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    /**
+     * Gets the job id
+     *
+     * @return the Job id
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * Sets the parent job id for message
+     *
+     * @param parentId the parent job id
+     */
+    public void setParentId(String parentId) {
+        this.parentId = parentId;
+    }
+
+    /**
+     * Gets the parent job id
+     *
+     * @return the parentId
+     */
+    public String getParentId() {
+        return parentId;
+    }
+
+    /**
+     * Sets the job start time for message
+     *
+     * @param startTime
+     */
+    public void setStartTime(Date startTime) {
+        this.startTime = startTime;
+    }
+
+    /**
+     * Gets the job start time
+     *
+     * @return the start time
+     */
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * Sets the job end time for message
+     *
+     * @param endTime
+     */
+    public void setEndTime(Date endTime) {
+        this.endTime = endTime;
+    }
+
+    /**
+     * Gets the job end time
+     *
+     * @return the end time
+     */
+    public Date getEndTime() {
+        return endTime;
+    }
+
+    /**
+     * Sets the job's app name for message
+     *
+     * @param appName
+     */
+    public void setAppName(String appName) {
+        this.appName = appName;
+    }
+
+    /**
+     * Gets the job's app name
+     *
+     * @return the app name
+     */
+    @JsonIgnore
+    public String getAppName() {
+        return appName;
+    }
+
+    /**
+     * Sets the JMS selectors for message
+     *
+     * @param properties the jms selector key value pair
+     */
+    public void setMessageProperties(Map<String, String> properties) {
+        jmsMessageProperties = properties;
+    }
+
+    /**
+     * Gets the message properties
+     *
+     * @return the message properties
+     */
+    @JsonIgnore
+    public Map<String, String> getMessageProperties() {
+        return jmsMessageProperties;
+    }
+
+    /**
+     * sets the job user for the msg
+     *
+     * @param user
+     */
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    /**
+     * Gets the job user
+     *
+     * @return the user
+     */
+    @JsonIgnore
+    public String getUser() {
+        return user;
+    }
+
+    /**
+     * Sets the event status
+     *
+     * @param eventStatus
+     */
+    public void setEventStatus(JobEvent.EventStatus eventStatus) {
+        this.eventStatus = eventStatus;
+    }
+
+    /**
+     * Gets the event status
+     *
+     * @return the event status
+     */
+    @JsonIgnore
+    public JobEvent.EventStatus getEventStatus() {
+        return eventStatus;
+    }
+
+}

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import java.util.Date;
+
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * Class holding attributes related to a workflow job message
+ *
+ */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class WorkflowJobMessage extends JobMessage {
+
+    @JsonProperty
+    private WorkflowJob.Status status;
+    @JsonProperty
+    private String errorCode;
+    @JsonProperty
+    private String errorMessage;
+
+    /**
+     * Default constructor
+     */
+    public WorkflowJobMessage() {
+        // Default constructor for jackson
+    }
+
+    /**
+     * Constructor for a workflow job message
+     * @param eventStatus event status
+     * @param workflowJobId the workflow job id
+     * @param coordinatorActionId the parent coordinator action id
+     * @param startTime start time of workflow
+     * @param endTime end time of workflow
+     * @param status status of workflow
+     * @param user the user
+     * @param appName appName of workflow
+     * @param errorCode errorCode of the failed wf actions
+     * @param errorMessage errorMessage of the failed wf action
+     */
+    public WorkflowJobMessage(EventStatus eventStatus, String workflowJobId,
+            String coordinatorActionId, Date startTime, Date endTime, WorkflowJob.Status status, String user,
+            String appName, String errorCode, String errorMessage) {
+        super(eventStatus, Event.AppType.WORKFLOW_JOB, workflowJobId, coordinatorActionId, startTime,
+                endTime, user, appName);
+        this.status = status;
+        this.errorCode = errorCode;
+        this.errorMessage = errorMessage;
+    }
+
+    /**
+     * Set the workflow job status
+     * @param status
+     */
+    public void setStatus(WorkflowJob.Status status) {
+        this.status = status;
+    }
+
+    /**
+     * Get the workflow job status
+     * @return the workflow status
+     */
+    public WorkflowJob.Status getStatus() {
+        return status;
+    }
+
+    /**
+     * Set the workflow error code
+     * @param errorCode
+     */
+    public void setErrorCode(String errorCode) {
+        this.errorCode = errorCode;
+    }
+
+    /**
+     * Get the workflow error code
+     * @return the error code
+     */
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    /**
+     * Set the workflow error message
+     * @param errorMessage
+     */
+    public void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    /**
+     * Get the error message
+     * @return the error message
+     */
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+}

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -196,4 +196,7 @@ public interface JsonTags {
     public static final String TIME_ZOME_DISPLAY_NAME = "timezoneDisplayName";
     public static final String TIME_ZONE_ID = "timezoneId";
 
+    public static final String JMS_TOPIC_NAME = "jmsTopic";
+    public static final String JMS_JNDI_PROPERTIES = "jmsJNDIProps";
+
 }

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,10 +21,12 @@ import org.apache.oozie.client.BulkRespo
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.JMSConnectionInfo;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
@@ -34,6 +36,9 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
 
 /**
  * JSON to bean converter for {@link WorkflowAction}, {@link WorkflowJob}, {@link CoordinatorAction}
@@ -65,6 +70,7 @@ public class JsonToBean {
     private static final Map<String, Property> COORD_ACTION = new HashMap<String, Property>();
     private static final Map<String, Property> BUNDLE_JOB = new HashMap<String, Property>();
     private static final Map<String, Property> BULK_RESPONSE = new HashMap<String, Property>();
+    private static final Map<String, Property> JMS_CONNECTION_INFO = new HashMap<String, Property>();
 
     static {
         WF_ACTION.put("getId", new Property(JsonTags.WORKFLOW_ACTION_ID, String.class));
@@ -178,6 +184,8 @@ public class JsonToBean {
         BULK_RESPONSE.put("getCoordinator", new Property(JsonTags.BULK_RESPONSE_COORDINATOR, CoordinatorJob.class, true));
         BULK_RESPONSE.put("getAction", new Property(JsonTags.BULK_RESPONSE_ACTION, CoordinatorAction.class, true));
 
+        JMS_CONNECTION_INFO.put("getTopicName", new Property(JsonTags.JMS_TOPIC_NAME, String.class));
+        JMS_CONNECTION_INFO.put("getJNDIProperties", new Property(JsonTags.JMS_JNDI_PROPERTIES, Properties.class));
     }
 
     /**
@@ -214,6 +222,7 @@ public class JsonToBean {
                 else if (prop.type == CoordinatorJob.class) {
                     return createCoordinatorJobList((JSONArray) json.get(prop.label));
                 }
+
                 else {
                     throw new RuntimeException("Unsupported list type : " + prop.type.getSimpleName());
                 }
@@ -243,6 +252,15 @@ public class JsonToBean {
             else if (type == WorkflowAction.class) {
                 return createWorkflowAction((JSONObject) obj);
             }
+            else if (type == Properties.class){
+                JSONObject jsonMap = (JSONObject)JSONValue.parse((String)obj);
+                Properties props = new Properties();
+                Set<Map.Entry> entrySet = jsonMap.entrySet();
+                for (Map.Entry jsonEntry: entrySet){
+                    props.put(jsonEntry.getKey(), jsonEntry.getValue());
+                }
+                return props;
+            }
             else {
                 throw new RuntimeException("Unsupported type : " + type.getSimpleName());
             }
@@ -339,6 +357,18 @@ public class JsonToBean {
                                                        new JsonInvocationHandler(COORD_JOB, json));
     }
 
+
+    /**
+     * Creates a JMSInfo bean from a JSON object.
+     *
+     * @param json json object.
+     * @return a coordinator job bean populated with the JSON object values.
+     */
+    public static JMSConnectionInfo createJMSConnectionInfo(JSONObject json) {
+        return (JMSConnectionInfo) Proxy.newProxyInstance(JsonToBean.class.getClassLoader(),
+                new Class[] { JMSConnectionInfo.class }, new JsonInvocationHandler(JMS_CONNECTION_INFO, json));
+    }
+
     /**
      * Creates a list of coordinator job beans from a JSON array.
      *

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java Wed Apr 17 23:55:20 2013
@@ -146,8 +146,10 @@ public interface RestConstants {
     public static final String SLA = "sla";
 
     public static final String DO_AS_PARAM = "doAs";
-    
+
     public static final String TIME_ZONE_PARAM = "timezone";
-    
+
     public static final String ADMIN_TIME_ZONES_RESOURCE = "available-timezones";
+
+    public static final String JOB_SHOW_JMS_INFO = "jmsinfo";
 }

Modified: oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java (original)
+++ oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,12 +20,15 @@ package org.apache.oozie.client.rest;
 import junit.framework.TestCase;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.JMSConnectionInfo;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 
 import java.util.List;
+import java.util.Properties;
 
 public class TestJsonToBean extends TestCase {
 
@@ -312,4 +315,25 @@ public class TestJsonToBean extends Test
         assertEquals("cj2", list.get(1).getId());
     }
 
+
+    private JSONObject createJMSInfoJSONObject(){
+        JSONObject json = new JSONObject();
+        json.put(JsonTags.JMS_TOPIC_NAME, "topic");
+        Properties props = new Properties();
+        props.put("k1", "v1");
+        props.put("k2", "v2");
+        json.put(JsonTags.JMS_JNDI_PROPERTIES, JSONValue.toJSONString(props));
+        return json;
+    }
+
+    public void testParseJMSInfo() {
+        JSONObject json = createJMSInfoJSONObject();
+        JMSConnectionInfo jmsDetails = JsonToBean.createJMSConnectionInfo(json);
+        assertEquals("topic", jmsDetails.getTopicName());
+        Properties jmsProps = jmsDetails.getJNDIProperties();
+        assertNotNull(jmsDetails.getJNDIProperties());
+        assertEquals("v1", jmsProps.get("k1"));
+
+    }
+
 }

Modified: oozie/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/pom.xml?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/pom.xml (original)
+++ oozie/trunk/core/pom.xml Wed Apr 17 23:55:20 2013
@@ -256,12 +256,6 @@
 
         <dependency>
             <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-client</artifactId>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-broker</artifactId>
             <scope>test</scope>
         </dependency>

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java Wed Apr 17 23:55:20 2013
@@ -23,6 +23,9 @@ import java.io.Writer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.JMSInfoXCommand;
 
 public abstract class BaseEngine {
     public static final String USE_XCOMMAND = "oozie.useXCommand";
@@ -194,4 +197,20 @@ public abstract class BaseEngine {
      */
     public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
 
+    /**
+     * Return the jms info about a job.
+     *
+     * @param jobId job Id.
+     * @return the JMS info bean
+     * @throws DagEngineException thrown if the jms info could not be obtained.
+     */
+    public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
+        try {
+                return new JMSInfoXCommand(jobId).call();
+        }
+        catch (CommandException ex) {
+            throw new DagEngineException(ex);
+        }
+    }
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java Wed Apr 17 23:55:20 2013
@@ -76,7 +76,9 @@ import org.apache.openjpa.persistence.jd
                 "c.id, c.appName, c.status FROM CoordinatorActionBean a, CoordinatorJobBean c " +
                 "WHERE a.jobId = c.id AND c.bundleId = :bundleId ORDER BY a.jobId, a.createdTimestamp"),
 
-        @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c") })
+        @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c"),
+
+        @NamedQuery(name = "GET_BUNDLE_JOB_FOR_USER", query = "select w.user from BundleJobBean w where w.id = :id") })
 public class BundleJobBean extends JsonBundleJob implements Writable {
 
     @Basic

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java Wed Apr 17 23:55:20 2013
@@ -75,7 +75,10 @@ import org.apache.openjpa.persistence.jd
 
         @NamedQuery(name = "GET_COORD_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from CoordinatorJobBean w where w.bundleId = :parentId and (w.status NOT IN ('SUCCEEDED', 'FAILED', 'KILLED', 'DONEWITHERROR') OR w.lastModifiedTimestamp >= :lastModTime)"),
 
-        @NamedQuery(name = "GET_COORD_JOB_FOR_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id")
+        @NamedQuery(name = "GET_COORD_JOB_FOR_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id"),
+
+        @NamedQuery(name = "GET_COORD_JOB_FOR_USER", query = "select w.user from CoordinatorJobBean w where w.id = :id")
+
 })
 public class CoordinatorJobBean extends JsonCoordinatorJob implements Writable {
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Wed Apr 17 23:55:20 2013
@@ -234,6 +234,8 @@ public enum ErrorCode {
 
     E1501(XLog.STD, "Error in getting HCat Access [{0}]"),
 
+    E1601(XLog.STD, "Cannot retrieve JMS connection info [{0}]"),
+
     ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
 
     private String template;

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -71,8 +71,9 @@ import org.apache.openjpa.persistence.jd
 
     @NamedQuery(name = "GET_WORKFLOWS_WITH_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId = :parentId"),
 
-    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.status = 'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >= :endTime)")
+    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.status = 'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >= :endTime)"),
 
+    @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id")
         })
 public class WorkflowJobBean extends JsonWorkflowJob implements Writable {
 

Added: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.rest;
+
+import java.util.Properties;
+
+import org.apache.oozie.client.JMSConnectionInfo;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+/**
+ * JMS connection info bean representing the JMS related information for a job
+ *
+ */
+public class JMSConnectionInfoBean implements JMSConnectionInfo, JsonBean {
+
+    private Properties JNDIProperties;
+    private String topicName;
+
+
+    @Override
+    public JSONObject toJSONObject() {
+        return toJSONObject("GMT");
+    }
+
+    /**
+     * Set the topic name
+     * @param topicName
+     */
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    /**
+     * Set the JNDI properties for jms connection
+     * @param JNDIProperties
+     */
+    public void setJNDIProperties(Properties JNDIProperties) {
+        this.JNDIProperties = JNDIProperties;
+    }
+
+    @Override
+    public String getTopicName() {
+        return topicName;
+    }
+
+    @Override
+    public Properties getJNDIProperties() {
+        return JNDIProperties;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public JSONObject toJSONObject(String timeZoneId) {
+        JSONObject json = new JSONObject();
+        json.put(JsonTags.JMS_JNDI_PROPERTIES, JSONValue.toJSONString(JNDIProperties));
+        json.put(JsonTags.JMS_TOPIC_NAME, topicName);
+        return json;
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.jms.DefaultJMSServerInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
+import org.apache.oozie.jms.JMSServerInfo;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Command to create and retrieve the JMS ConnectionInfo bean
+ *
+ */
+public class JMSInfoXCommand extends XCommand<JMSConnectionInfoBean> {
+
+    private final String jobId;
+    private final Configuration conf = Services.get().getConf();
+
+    /**
+     * Constructor for creating the JMSInfoXcommand
+     * @param jobId
+     */
+    public JMSInfoXCommand(String jobId) {
+        super("jms_info", "jms_info", 0);
+        this.jobId = jobId;
+    }
+
+    protected JMSConnectionInfoBean execute() throws CommandException {
+        String connectionProperties = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+        Class<?> defaultClazz = conf.getClass(JMSAccessorService.JMS_PRODUCER_CONNECTION_INFO_IMPL, DefaultJMSServerInfo.class);
+        JMSServerInfo jmsServerInfo = null;
+        try {
+            if (defaultClazz == DefaultJMSServerInfo.class) {
+                jmsServerInfo = new DefaultJMSServerInfo();
+            }
+            else {
+                jmsServerInfo = (JMSServerInfo) ReflectionUtils.newInstance(defaultClazz, null);
+            }
+            JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, jobId);
+            return jmsBean;
+        }
+        catch (Exception e) {
+            throw new CommandException(ErrorCode.E1601, e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return jobId;
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java Wed Apr 17 23:55:20 2013
@@ -110,7 +110,7 @@ public class CoordinatorActionEvent exte
     }
 
     public void setErrorMessage(String msg) {
-        errorCode = msg;
+        errorMessage = msg;
     }
 
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java Wed Apr 17 23:55:20 2013
@@ -40,129 +40,33 @@ public abstract class JobEventListener {
     public abstract void destroy();
 
     /**
-     * On workflow job transition to start state
+     * On workflow job transition
      * @param WorkflowJobEvent
      */
-    public abstract void onWorkflowJobStart(WorkflowJobEvent wje);
+    public abstract void onWorkflowJobEvent(WorkflowJobEvent wje);
 
     /**
-     * On workflow job transition to success state
-     * @param WorkflowJobEvent
-     */
-    public abstract void onWorkflowJobSuccess(WorkflowJobEvent wje);
-
-    /**
-     * On workflow job transition to failure state
-     * @param WorkflowJobEvent
-     */
-    public abstract void onWorkflowJobFailure(WorkflowJobEvent wje);
-
-    /**
-     * On workflow job transition to suspend state
-     * @param WorkflowJobEvent
-     */
-    public abstract void onWorkflowJobSuspend(WorkflowJobEvent wje);
-
-    /**
-     * On workflow action transition to start state
-     * @param WorkflowActionEvent
-     */
-    public abstract void onWorkflowActionStart(WorkflowActionEvent wae);
-
-    /**
-     * On workflow action transition to success state
+     * On workflow action transition
      * @param WorkflowActionEvent
      */
-    public abstract void onWorkflowActionSuccess(WorkflowActionEvent wae);
-
-    /**
-     * On workflow action transition to failure state
-     * @param WorkflowActionEvent
-     */
-    public abstract void onWorkflowActionFailure(WorkflowActionEvent wae);
-
-    /**
-     * On workflow action transition to suspend state
-     * @param WorkflowActionEvent
-     */
-    public abstract void onWorkflowActionSuspend(WorkflowActionEvent wae);
-
-    /**
-     * On coord job transition to start state
-     * @param CoordinatorJobEvent
-     */
-    public abstract void onCoordinatorJobStart(CoordinatorJobEvent wje);
-
-    /**
-     * On coord job transition to success state
-     * @param CoordinatorJobEvent
-     */
-    public abstract void onCoordinatorJobSuccess(CoordinatorJobEvent wje);
+    public abstract void onWorkflowActionEvent(WorkflowActionEvent wae);
 
     /**
-     * On coord job transition to failure state
+     * On coordinator job transition
      * @param CoordinatorJobEvent
      */
-    public abstract void onCoordinatorJobFailure(CoordinatorJobEvent wje);
+    public abstract void onCoordinatorJobEvent(CoordinatorJobEvent wje);
 
     /**
-     * On coord job transition to suspend state
-     * @param CoordinatorJobEvent
-     */
-    public abstract void onCoordinatorJobSuspend(CoordinatorJobEvent wje);
-
-    /**
-     * On coord action transition to waiting state
-     * @param CoordinatorActionEvent
-     */
-    public abstract void onCoordinatorActionWaiting(CoordinatorActionEvent wae);
-
-    /**
-     * On coord action transition to start state
-     * @param CoordinatorActionEvent
-     */
-    public abstract void onCoordinatorActionStart(CoordinatorActionEvent wae);
-
-    /**
-     * On coord action transition to success state
-     * @param CoordinatorActionEvent
-     */
-    public abstract void onCoordinatorActionSuccess(CoordinatorActionEvent wae);
-
-    /**
-     * On coord action transition to failure state
+     * On coordinator action transition
      * @param CoordinatorActionEvent
      */
-    public abstract void onCoordinatorActionFailure(CoordinatorActionEvent wae);
-
-    /**
-     * On coord action transition to suspend state
-     * @param CoordinatorActionEvent
-     */
-    public abstract void onCoordinatorActionSuspend(CoordinatorActionEvent wae);
-
-    /**
-     * On bundle job transition to start state
-     * @param BundleJobEvent
-     */
-    public abstract void onBundleJobStart(BundleJobEvent wje);
-
-    /**
-     * On bundle job transition to success state
-     * @param BundleJobEvent
-     */
-    public abstract void onBundleJobSuccess(BundleJobEvent wje);
-
-    /**
-     * On bundle job transition to failure state
-     * @param BundleJobEvent
-     */
-    public abstract void onBundleJobFailure(BundleJobEvent wje);
+    public abstract void onCoordinatorActionEvent(CoordinatorActionEvent wae);
 
     /**
-     * On bundle job transition to suspend state
+     * On bundle job transition
      * @param BundleJobEvent
      */
-    public abstract void onBundleJobSuspend(BundleJobEvent wje);
+    public abstract void onBundleJobEvent(BundleJobEvent wje);
 
 }

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.messaging;
+
+public class JSONMessageSerializer extends MessageSerializer {
+    public static final String MESSAGE_FORMAT = "json";
+
+    @Override
+    public String getSerializedObject(Object object) {
+        try {
+            return mapper.writeValueAsString(object);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONMessage.", exception);
+        }
+    }
+
+    @Override
+    public String getMessageFormat() {
+        return MESSAGE_FORMAT;
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.messaging;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.service.Services;
+
+/**
+ * Factory for constructing messages and retrieving the serializer
+ */
+public class MessageFactory {
+
+    public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json");
+    public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.message.serialize.";
+
+    private static class MessageSerializerHolder {
+        private static String messageSerializerInstance = Services
+                .get()
+                .getConf()
+                .get(OOZIE_MESSAGE_SERIALIZE + OOZIE_MESSAGE_FORMAT,
+                        "org.apache.oozie.event.messaging.JSONMessageSerializer");
+        public static final MessageSerializer INSTANCE;
+        static {
+            try {
+                INSTANCE = (MessageSerializer) ReflectionUtils.newInstance(Class.forName(messageSerializerInstance),
+                        null);
+            }
+            catch (ClassNotFoundException cnfe) {
+                throw new IllegalStateException("Could not construct the serializer ", cnfe);
+            }
+        }
+    }
+
+    /**
+     * Gets the configured serializer
+     *
+     * @return
+     */
+    public static MessageSerializer getMessageSerializer() {
+        return MessageSerializerHolder.INSTANCE;
+    }
+
+    /**
+     * Constructs and returns the workflow job message for workflow job event
+     *
+     * @param wfJobEvent the workflow job event
+     * @return
+     */
+    public static WorkflowJobMessage createWorkflowJobMessage(WorkflowJobEvent wfJobEvent) {
+        WorkflowJobMessage wfJobMessage = new WorkflowJobMessage(wfJobEvent.getEventStatus(), wfJobEvent.getId(),
+                wfJobEvent.getParentId(), wfJobEvent.getStartTime(), wfJobEvent.getEndTime(), wfJobEvent.getStatus(),
+                wfJobEvent.getUser(), wfJobEvent.getAppName(), wfJobEvent.getErrorCode(), wfJobEvent.getErrorMessage());
+        return wfJobMessage;
+    }
+
+    /**
+     * Constructs and returns the coordinator action message for coordinator
+     * action event
+     *
+     * @param coordActionEvent the coordinator action event
+     * @return
+     */
+    public static CoordinatorActionMessage createCoordinatorActionMessage(CoordinatorActionEvent coordActionEvent) {
+        CoordinatorActionMessage coordActionMessage = new CoordinatorActionMessage(coordActionEvent.getEventStatus(),
+                coordActionEvent.getId(), coordActionEvent.getParentId(), coordActionEvent.getStartTime(),
+                coordActionEvent.getEndTime(), coordActionEvent.getNominalTime(), coordActionEvent.getStatus(),
+                coordActionEvent.getUser(), coordActionEvent.getAppName(), coordActionEvent.getMissingDeps(),
+                coordActionEvent.getErrorCode(), coordActionEvent.getErrorMessage());
+        return coordActionMessage;
+    }
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.messaging;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Message Serializer to serialize the java object
+ *
+ */
+public abstract class MessageSerializer {
+    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+    /**
+     * Get the serialized string from object
+     * @param object the java object
+     * @return
+     */
+    public abstract String getSerializedObject(Object object);
+
+    /**
+     * Get the message format for the serializer
+     * @return
+     */
+    public abstract String getMessageFormat();
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Retrieve the name of the user submitting the bundle job
+ *
+ */
+public class BundleJobGetForUserJPAExecutor implements JPAExecutor<String> {
+
+    private String bundleJobId = null;
+
+    public BundleJobGetForUserJPAExecutor(String bundleJobId) {
+        ParamChecker.notNull(bundleJobId, "bundleJobId");
+        this.bundleJobId = bundleJobId;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    public String getName() {
+        return "BundleJobGetForUserJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public String execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("GET_BUNDLE_JOB_FOR_USER");
+            q.setParameter("id", bundleJobId);
+            String user = (String) q.getSingleResult();
+            return user;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+}