You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/05/08 19:59:53 UTC

svn commit: r1480380 [1/2] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/cli/ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/event/ client/src/main/java/org/apache/oozie/client/event/jms/ client...

Author: virag
Date: Wed May  8 17:59:52 2013
New Revision: 1480380

URL: http://svn.apache.org/r1480380
Log:
OOZIE-1347 Additions to JMS topic API (virag)

Added:
    oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java
Modified:
    oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
    oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
    oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
    oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
    oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
    oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
    oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
    oozie/trunk/release-log.txt
    oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Wed May  8 17:59:52 2013
@@ -111,7 +111,6 @@ public class OozieCLI {
     public static final String ACTION_OPTION = "action";
     public static final String DEFINITION_OPTION = "definition";
     public static final String CONFIG_CONTENT_OPTION = "configcontent";
-    public static final String JMS_INFO_OPTION = "jmsinfo";
 
     public static final String DO_AS_OPTION = "doas";
 
@@ -255,7 +254,6 @@ public class OozieCLI {
         Option changeValue = new Option(CHANGE_VALUE_OPTION, true,
                 "new endtime/concurrency/pausetime value for changing a coordinator job");
         Option info = new Option(INFO_OPTION, true, "info of a job");
-        Option jmsInfo = new Option (JMS_INFO_OPTION, true, "JMS Topic name and JNDI connection string for a job");
         Option offset = new Option(OFFSET_OPTION, true, "job info offset of actions (default '1', requires -info)");
         Option len = new Option(LEN_OPTION, true, "number of actions (default TOTAL ACTIONS, requires -info)");
         Option filter = new Option(FILTER_OPTION, true,
@@ -292,7 +290,6 @@ public class OozieCLI {
         actions.addOption(kill);
         actions.addOption(change);
         actions.addOption(info);
-        actions.addOption(jmsInfo);
         actions.addOption(rerun);
         actions.addOption(log);
         actions.addOption(definition);
@@ -751,9 +748,6 @@ public class OozieCLI {
         }
 
         try {
-            if (options.contains(JMS_INFO_OPTION)) {
-                printJMSInfo(wc.getJMSConnectionInfo(commandLine.getOptionValue(JMS_INFO_OPTION)));
-            }
             if (options.contains(SUBMIT_OPTION)) {
                 System.out.println(JOB_ID_PREFIX + wc.submit(getConfiguration(wc, commandLine)));
             }
@@ -1084,10 +1078,6 @@ public class OozieCLI {
         }
     }
 
-    private void printJMSInfo (JMSConnectionInfo jmsInfo) {
-        System.out.println("Topic Name         : " + jmsInfo.getTopicName());
-        System.out.println("JNDI Properties    : " + jmsInfo.getJNDIProperties());
-    }
 
     private void printWorkflowAction(WorkflowAction action, String timeZoneId, boolean verbose) {
         System.out.println("ID : " + maskIfNull(action.getId()));

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java Wed May  8 17:59:52 2013
@@ -19,6 +19,8 @@ package org.apache.oozie.client;
 
 import java.util.Properties;
 
+import org.apache.oozie.client.event.Event;
+
 /**
  * JMS connection related information
  *
@@ -26,10 +28,17 @@ import java.util.Properties;
 public interface JMSConnectionInfo {
 
     /**
-     * Retrieve the JMS topic name
-     * @return the topic name
+     * Get the topic prefix for a JMS topic
+     * @return JMS topic prefix
+     */
+    String getTopicPrefix();
+
+    /**
+     * Get the topic pattern given the app type of job
+     * @param appType the appType for a job
+     * @return JMS topic pattern
      */
-    String getTopicName();
+    String getTopicPattern(Event.AppType appType);
 
     /**
      * Retrieve the JNDI properties for establishing connection to JMS server

Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java?rev=1480380&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java Wed May  8 17:59:52 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client;
+
+import java.util.Properties;
+
+/**
+ * Wrapper for JMSConnectionInfo
+ *
+ */
+public interface JMSConnectionInfoWrapper {
+
+    /**
+     * Retrieve JNDI properties
+     * @return JNDI properties
+     */
+    Properties getJNDIProperties();
+
+    /**
+     * Get topic prefix
+     * @return the topic prefix
+     */
+    String getTopicPrefix();
+
+    /**
+     * Get topic pattern as Properties
+     * @return TopicPattern properties
+     */
+    Properties getTopicPatternProperties();
+}

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed May  8 17:59:52 2013
@@ -257,7 +257,7 @@ public class OozieClient {
                     if (!array.contains(WS_PROTOCOL_VERSION) && !array.contains(WS_PROTOCOL_VERSION_1)
                             && !array.contains(WS_PROTOCOL_VERSION_0)) {
                         StringBuilder msg = new StringBuilder();
-                        msg.append("Supported version [").append(WS_PROTOCOL_VERSION_1).append(
+                        msg.append("Supported version [").append(WS_PROTOCOL_VERSION).append(
                                 "] or less, Unsupported versions[");
                         String separator = "";
                         for (Object version : array) {
@@ -681,9 +681,8 @@ public class OozieClient {
 
     private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
 
-        JMSInfo(String jobId) {
-            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
-                    RestConstants.JOB_SHOW_JMS_INFO));
+        JMSInfo() {
+            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams());
         }
 
         protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
@@ -732,12 +731,11 @@ public class OozieClient {
 
     /**
      * Get the JMS Connection info
-     * @param jobId
      * @return JMSConnectionInfo object
      * @throws OozieClientException
      */
-    public JMSConnectionInfo getJMSConnectionInfo(String jobId) throws OozieClientException {
-        return new JMSInfo(jobId).call();
+    public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException {
+        return new JMSInfo().call();
     }
 
     /**
@@ -800,6 +798,36 @@ public class OozieClient {
     }
 
     /**
+     * Gets the JMS topic name for a particular job
+     * @param jobId given jobId
+     * @return the JMS topic name
+     * @throws OozieClientException
+     */
+    public String getJMSTopicName(String jobId) throws OozieClientException {
+        return new JMSTopic(jobId).call();
+    }
+
+    private class JMSTopic extends ClientCallable<String> {
+
+        JMSTopic(String jobId) {
+            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
+                    RestConstants.JOB_SHOW_JMS_TOPIC));
+        }
+
+        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
+            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+                Reader reader = new InputStreamReader(conn.getInputStream());
+                JSONObject json = (JSONObject) JSONValue.parse(reader);
+                return (String) json.get(JsonTags.JMS_TOPIC_NAME);
+            }
+            else {
+                handleError(conn);
+            }
+            return null;
+        }
+    }
+
+    /**
      * Get the definition of a workflow job.
      *
      * @param jobId job Id.

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java Wed May  8 17:59:52 2013
@@ -41,7 +41,8 @@ public interface Event {
         WORKFLOW_ACTION,
         COORDINATOR_JOB,
         COORDINATOR_ACTION,
-        BUNDLE_JOB
+        BUNDLE_JOB,
+        BUNDLE_ACTION
     }
 
     /**

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java Wed May  8 17:59:52 2013
@@ -23,11 +23,11 @@ package org.apache.oozie.client.event.jm
  */
 public final class JMSHeaderConstants {
     // JMS Application specific properties for selectors
-    public static final String EVENT_STATUS = "EVENT_STATUS";
-    public static final String APP_NAME = "APP_NAME";
-    public static final String USER = "USER";
-    public static final String MESSAGE_TYPE = "MESSAGE_TYPE";
-    public static final String APP_TYPE = "APP_TYPE";
+    public static final String EVENT_STATUS = "eventStatus";
+    public static final String APP_NAME = "appName";
+    public static final String USER = "user";
+    public static final String MESSAGE_TYPE = "msgType";
+    public static final String APP_TYPE = "appType";
     // JMS Header property
-    public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+    public static final String MESSAGE_FORMAT = "msgFormat";
 }

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java Wed May  8 17:59:52 2013
@@ -31,40 +31,52 @@ import org.apache.oozie.client.event.mes
  * Client utility to convert JMS message to EventMessage object
  */
 public class JMSMessagingUtils {
-
     private static final String DESERIALIZER_PROP = "oozie.msg.deserializer.";
+    private static MessageDeserializer deserializer;
+    private static Properties jmsDeserializerInfo;
+    private static final String CLIENT_PROPERTIES = "oozie_client.properties";
+
+    static {
+        InputStream is = JMSMessagingUtils.class.getClassLoader().getResourceAsStream(CLIENT_PROPERTIES);
+        if (is == null) {
+            System.out.println("Using default JSON Deserializer");
+            deserializer = new JSONMessageDeserializer();
+        }
+        else {
+            jmsDeserializerInfo = new Properties();
+            try {
+                jmsDeserializerInfo.load(is);
+                is.close();
+            }
+            catch (IOException ioe) {
+                throw new RuntimeException("I/O error occured for " + CLIENT_PROPERTIES, ioe);
+            }
+        }
+
+    }
 
     /**
      * Constructs the EventMessage object from JMS message
      *
+     * @param <T>
      * @param msg the JMS message
      * @return the EventMessage
      * @throws IOException
      * @throws JMSException
      */
-    public static EventMessage getEventMessage(Message msg) throws IOException, JMSException {
+    public static <T extends EventMessage> T getEventMessage(Message msg) throws IOException, JMSException {
         if (msg == null) {
             throw new IllegalArgumentException("Could not extract EventMessage as JMS message is null");
         }
-        TextMessage textMessage = (TextMessage) msg;
-        String msgFormat = msg.getStringProperty(JMSHeaderConstants.MESSAGE_FORMAT);
-
-        Properties jmsDeserializerInfo = new Properties();
-        InputStream is = JMSMessagingUtils.class.getClassLoader().getResourceAsStream("oozie_client.properties");
-
-        if (is == null) {
-            System.out.println("Using default deserializer");
-            return new JSONMessageDeserializer().getEventMessage(textMessage);
-        }
-        else {
-            jmsDeserializerInfo.load(is);
-            MessageDeserializer deserializer = getDeserializer((String) jmsDeserializerInfo.get(DESERIALIZER_PROP
-                    + msgFormat));
-            return deserializer.getEventMessage(textMessage);
+        if (deserializer == null) {
+            String msgFormat = msg.getStringProperty(JMSHeaderConstants.MESSAGE_FORMAT);
+            deserializer = getDeserializer(msgFormat);
         }
+        return deserializer.getEventMessage(msg);
     }
 
-    private static MessageDeserializer getDeserializer(String deserializerString) {
+    private static MessageDeserializer getDeserializer(String msgFormat) throws IOException {
+        String deserializerString = (String) jmsDeserializerInfo.get(DESERIALIZER_PROP + msgFormat);
         if (deserializerString == null) {
             return new JSONMessageDeserializer();
         }

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java Wed May  8 17:59:52 2013
@@ -19,18 +19,29 @@ package org.apache.oozie.client.event.jm
 
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.TextMessage;
 
+import org.apache.oozie.client.event.Event;
 import org.apache.oozie.client.event.Event.AppType;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.client.event.Event.MessageType;
 import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.EventMessage;
 import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Message deserializer to convert from JSON to java object
  */
 public class JSONMessageDeserializer extends MessageDeserializer {
 
+    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+    static {
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
     @Override
     public <T> T getDeserializedObject(String messageBody, Class<T> clazz) {
         try {
@@ -42,8 +53,9 @@ public class JSONMessageDeserializer ext
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage workflowJobMsg, Message message)
+    public  WorkflowJobMessage  setProperties(WorkflowJobMessage workflowJobMsg, Message message)
             throws JMSException {
         workflowJobMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
         workflowJobMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));
@@ -54,8 +66,9 @@ public class JSONMessageDeserializer ext
 
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMsg, Message message)
+    public CoordinatorActionMessage setProperties(CoordinatorActionMessage coordActionMsg, Message message)
             throws JMSException {
         coordActionMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
         coordActionMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java Wed May  8 17:59:52 2013
@@ -21,8 +21,6 @@ import org.apache.oozie.client.event.mes
 import org.apache.oozie.client.event.message.EventMessage;
 import org.apache.oozie.client.event.message.WorkflowJobMessage;
 import org.apache.oozie.client.event.Event;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.DeserializationConfig;
 import javax.jms.Message;
 import javax.jms.TextMessage;
 import javax.jms.JMSException;
@@ -31,11 +29,6 @@ import javax.jms.JMSException;
  * Class to deserialize the jms message to java object
  */
 public abstract class MessageDeserializer {
-    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
-
-    static {
-        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    }
 
     /**
      * Constructs the event message from JMS message
@@ -44,9 +37,10 @@ public abstract class MessageDeserialize
      * @return EventMessage
      * @throws JMSException
      */
-    public EventMessage getEventMessage(Message message) throws JMSException {
-        String appTypeString = message.getStringProperty(JMSHeaderConstants.APP_TYPE);
-        String messageBody = ((TextMessage) message).getText();
+    public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
+        TextMessage textMessage = (TextMessage) message;
+        String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
+        String messageBody = textMessage.getText();
 
         if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
             throw new IllegalArgumentException("Could not extract OozieEventMessage. "
@@ -56,11 +50,11 @@ public abstract class MessageDeserialize
         switch (Event.AppType.valueOf(appTypeString)) {
             case WORKFLOW_JOB:
                 WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
-                return setPropertiesForObject(wfJobMsg, message);
+                return setProperties(wfJobMsg, textMessage);
             case COORDINATOR_ACTION:
                 CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
                         CoordinatorActionMessage.class);
-                return setPropertiesForObject(caActionMsg, message);
+                return setProperties(caActionMsg, textMessage);
             default:
                 throw new UnsupportedOperationException("Conversion of " + appTypeString
                         + " to Event message is not supported");
@@ -78,7 +72,7 @@ public abstract class MessageDeserialize
      * @return WorkflowJobMessage
      * @throws JMSException
      */
-    public abstract WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage wfJobMessage, Message message)
+    protected abstract <T extends EventMessage> T setProperties(WorkflowJobMessage wfJobMessage, Message message)
             throws JMSException;
 
     /**
@@ -89,7 +83,7 @@ public abstract class MessageDeserialize
      * @return CoordinatorActionMessage
      * @throws JMSException
      */
-    public abstract CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMessage,
+    protected abstract <T extends EventMessage> T setProperties(CoordinatorActionMessage coordActionMessage,
             Message message) throws JMSException;
 
 }

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java Wed May  8 17:59:52 2013
@@ -185,7 +185,7 @@ public class JobMessage extends EventMes
      *
      * @param properties the jms selector key value pair
      */
-    public void setMessageProperties(Map<String, String> properties) {
+    void setMessageProperties(Map<String, String> properties) {
         jmsMessageProperties = properties;
     }
 

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Wed May  8 17:59:52 2013
@@ -196,7 +196,10 @@ public interface JsonTags {
     public static final String TIME_ZOME_DISPLAY_NAME = "timezoneDisplayName";
     public static final String TIME_ZONE_ID = "timezoneId";
 
-    public static final String JMS_TOPIC_NAME = "jmsTopic";
+    public static final String JMS_TOPIC_PATTERN = "jmsTopicPattern";
     public static final String JMS_JNDI_PROPERTIES = "jmsJNDIProps";
+    public static final String JMS_TOPIC_PREFIX = "jmsTopicPrefix";
+
+    public static final String JMS_TOPIC_NAME = "jmsTopicName";
 
 }

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Wed May  8 17:59:52 2013
@@ -22,8 +22,10 @@ import org.apache.oozie.client.BundleJob
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.JMSConnectionInfo;
+import org.apache.oozie.client.JMSConnectionInfoWrapper;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -184,8 +186,9 @@ public class JsonToBean {
         BULK_RESPONSE.put("getCoordinator", new Property(JsonTags.BULK_RESPONSE_COORDINATOR, CoordinatorJob.class, true));
         BULK_RESPONSE.put("getAction", new Property(JsonTags.BULK_RESPONSE_ACTION, CoordinatorAction.class, true));
 
-        JMS_CONNECTION_INFO.put("getTopicName", new Property(JsonTags.JMS_TOPIC_NAME, String.class));
+        JMS_CONNECTION_INFO.put("getTopicPatternProperties", new Property(JsonTags.JMS_TOPIC_PATTERN, Properties.class));
         JMS_CONNECTION_INFO.put("getJNDIProperties", new Property(JsonTags.JMS_JNDI_PROPERTIES, Properties.class));
+        JMS_CONNECTION_INFO.put("getTopicPrefix", new Property(JsonTags.JMS_TOPIC_PREFIX, String.class));
     }
 
     /**
@@ -365,8 +368,26 @@ public class JsonToBean {
      * @return a coordinator job bean populated with the JSON object values.
      */
     public static JMSConnectionInfo createJMSConnectionInfo(JSONObject json) {
-        return (JMSConnectionInfo) Proxy.newProxyInstance(JsonToBean.class.getClassLoader(),
-                new Class[] { JMSConnectionInfo.class }, new JsonInvocationHandler(JMS_CONNECTION_INFO, json));
+        final JMSConnectionInfoWrapper jmsInfo = (JMSConnectionInfoWrapper) Proxy.newProxyInstance(
+                JsonToBean.class.getClassLoader(), new Class[] { JMSConnectionInfoWrapper.class },
+                new JsonInvocationHandler(JMS_CONNECTION_INFO, json));
+
+        return new JMSConnectionInfo() {
+            @Override
+            public String getTopicPrefix() {
+                return jmsInfo.getTopicPrefix();
+            }
+
+            @Override
+            public String getTopicPattern(AppType appType) {
+                return (String)jmsInfo.getTopicPatternProperties().get(appType.name());
+            }
+
+            @Override
+            public Properties getJNDIProperties() {
+                return jmsInfo.getJNDIProperties();
+            }
+        };
     }
 
     /**

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java Wed May  8 17:59:52 2013
@@ -151,5 +151,7 @@ public interface RestConstants {
 
     public static final String ADMIN_TIME_ZONES_RESOURCE = "available-timezones";
 
-    public static final String JOB_SHOW_JMS_INFO = "jmsinfo";
+    public static final String ADMIN_JMS_INFO = "jmsinfo";
+
+    public static final String JOB_SHOW_JMS_TOPIC = "jmstopic";
 }

Modified: oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java (original)
+++ oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java Wed May  8 17:59:52 2013
@@ -21,8 +21,10 @@ import junit.framework.TestCase;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.JMSConnectionInfo;
+import org.apache.oozie.client.JMSConnectionInfoWrapper;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -318,7 +320,12 @@ public class TestJsonToBean extends Test
 
     private JSONObject createJMSInfoJSONObject(){
         JSONObject json = new JSONObject();
-        json.put(JsonTags.JMS_TOPIC_NAME, "topic");
+        json.put(JsonTags.JMS_TOPIC_PREFIX, "topicPrefix");
+        Properties topicProps = new Properties();
+        topicProps.put(Event.AppType.WORKFLOW_JOB, "wfTopic");
+        topicProps.put(Event.AppType.WORKFLOW_ACTION, "wfTopic");
+        topicProps.put(Event.AppType.COORDINATOR_ACTION, "coordTopic");
+        json.put(JsonTags.JMS_TOPIC_PATTERN, JSONValue.toJSONString(topicProps));
         Properties props = new Properties();
         props.put("k1", "v1");
         props.put("k2", "v2");
@@ -329,10 +336,14 @@ public class TestJsonToBean extends Test
     public void testParseJMSInfo() {
         JSONObject json = createJMSInfoJSONObject();
         JMSConnectionInfo jmsDetails = JsonToBean.createJMSConnectionInfo(json);
-        assertEquals("topic", jmsDetails.getTopicName());
+        assertEquals("topicPrefix", jmsDetails.getTopicPrefix());
+        assertEquals("wfTopic", jmsDetails.getTopicPattern(Event.AppType.WORKFLOW_JOB));
+        assertEquals("wfTopic", jmsDetails.getTopicPattern(Event.AppType.WORKFLOW_ACTION));
+        assertEquals("coordTopic", jmsDetails.getTopicPattern(Event.AppType.COORDINATOR_ACTION));
         Properties jmsProps = jmsDetails.getJNDIProperties();
         assertNotNull(jmsDetails.getJNDIProperties());
         assertEquals("v1", jmsProps.get("k1"));
+        assertEquals("v2", jmsProps.get("k2"));
 
     }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java Wed May  8 17:59:52 2013
@@ -23,9 +23,9 @@ import java.io.Writer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.JMSInfoXCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
 
 public abstract class BaseEngine {
     public static final String USE_XCOMMAND = "oozie.useXCommand";
@@ -197,19 +197,28 @@ public abstract class BaseEngine {
      */
     public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
 
+
     /**
-     * Return the jms info about a job.
+     * Return the jms topic name for the job.
      *
      * @param jobId job Id.
-     * @return the JMS info bean
+     * @return String the topic name
      * @throws DagEngineException thrown if the jms info could not be obtained.
      */
-    public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
-        try {
-                return new JMSInfoXCommand(jobId).call();
+    public String getJMSTopicName(String jobId) throws DagEngineException {
+        JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+        if (jmsTopicService != null) {
+            try {
+                return jmsTopicService.getTopic(jobId);
+            }
+            catch (JPAExecutorException e) {
+               throw new DagEngineException(ErrorCode.E1602, e);
+            }
         }
-        catch (CommandException ex) {
-            throw new DagEngineException(ex);
+        else {
+            throw new DagEngineException(ErrorCode.E1602,
+                    "JMSTopicService is not initialized. JMS notification"
+                            + "may not be enabled");
         }
     }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Wed May  8 17:59:52 2013
@@ -235,6 +235,7 @@ public enum ErrorCode {
     E1501(XLog.STD, "Error in getting HCat Access [{0}]"),
 
     E1601(XLog.STD, "Cannot retrieve JMS connection info [{0}]"),
+    E1602(XLog.STD, "Cannot retrieve Topic name [{0}]"),
 
     ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java Wed May  8 17:59:52 2013
@@ -19,17 +19,19 @@ package org.apache.oozie.client.rest;
 
 import java.util.Properties;
 
-import org.apache.oozie.client.JMSConnectionInfo;
+import org.apache.oozie.client.JMSConnectionInfoWrapper;
+import org.apache.oozie.client.rest.JsonTags;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 /**
  * JMS connection info bean representing the JMS related information for a job
  *
  */
-public class JMSConnectionInfoBean implements JMSConnectionInfo, JsonBean {
+public class JMSConnectionInfoBean implements JsonBean, JMSConnectionInfoWrapper {
 
     private Properties JNDIProperties;
-    private String topicName;
+    private String topicPrefix;
+    private Properties topicProperties;
 
 
     @Override
@@ -38,14 +40,6 @@ public class JMSConnectionInfoBean imple
     }
 
     /**
-     * Set the topic name
-     * @param topicName
-     */
-    public void setTopicName(String topicName) {
-        this.topicName = topicName;
-    }
-
-    /**
      * Set the JNDI properties for jms connection
      * @param JNDIProperties
      */
@@ -53,12 +47,6 @@ public class JMSConnectionInfoBean imple
         this.JNDIProperties = JNDIProperties;
     }
 
-    @Override
-    public String getTopicName() {
-        return topicName;
-    }
-
-    @Override
     public Properties getJNDIProperties() {
         return JNDIProperties;
     }
@@ -68,8 +56,35 @@ public class JMSConnectionInfoBean imple
     public JSONObject toJSONObject(String timeZoneId) {
         JSONObject json = new JSONObject();
         json.put(JsonTags.JMS_JNDI_PROPERTIES, JSONValue.toJSONString(JNDIProperties));
-        json.put(JsonTags.JMS_TOPIC_NAME, topicName);
+        json.put(JsonTags.JMS_TOPIC_PATTERN, JSONValue.toJSONString(topicProperties));
+        json.put(JsonTags.JMS_TOPIC_PREFIX, topicPrefix);
         return json;
     }
 
+    @Override
+    public String getTopicPrefix() {
+       return topicPrefix;
+    }
+
+    /**
+     * Sets the topic prefix
+     * @param topicPrefix
+     */
+    public void setTopicPrefix(String topicPrefix) {
+        this.topicPrefix = topicPrefix;
+    }
+
+    /**
+     * Set the topic pattern properties
+     * @param topicProperties
+     */
+    public void setTopicPatternProperties(Properties topicProperties) {
+        this.topicProperties = topicProperties;
+    }
+
+    @Override
+    public Properties getTopicPatternProperties() {
+        return topicProperties;
+    }
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java Wed May  8 17:59:52 2013
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.jms.DefaultJMSServerInfo;
-import org.apache.oozie.jms.JMSJobEventListener;
-import org.apache.oozie.jms.JMSServerInfo;
-import org.apache.oozie.service.JMSAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Command to create and retrieve the JMS ConnectionInfo bean
- *
- */
-public class JMSInfoXCommand extends XCommand<JMSConnectionInfoBean> {
-
-    private final String jobId;
-    private final Configuration conf = Services.get().getConf();
-
-    /**
-     * Constructor for creating the JMSInfoXcommand
-     * @param jobId
-     */
-    public JMSInfoXCommand(String jobId) {
-        super("jms_info", "jms_info", 0);
-        this.jobId = jobId;
-    }
-
-    protected JMSConnectionInfoBean execute() throws CommandException {
-        String connectionProperties = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
-        Class<?> defaultClazz = conf.getClass(JMSAccessorService.JMS_PRODUCER_CONNECTION_INFO_IMPL, DefaultJMSServerInfo.class);
-        JMSServerInfo jmsServerInfo = null;
-        try {
-            if (defaultClazz == DefaultJMSServerInfo.class) {
-                jmsServerInfo = new DefaultJMSServerInfo();
-            }
-            else {
-                jmsServerInfo = (JMSServerInfo) ReflectionUtils.newInstance(defaultClazz, null);
-            }
-            JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, jobId);
-            return jmsBean;
-        }
-        catch (Exception e) {
-            throw new CommandException(ErrorCode.E1601, e.getMessage(), e);
-        }
-
-    }
-
-    @Override
-    protected boolean isLockRequired() {
-        return false;
-    }
-
-    @Override
-    public String getEntityKey() {
-        return jobId;
-    }
-
-    @Override
-    protected void loadState() throws CommandException {
-    }
-
-    @Override
-    protected void verifyPrecondition() throws CommandException, PreconditionException {
-    }
-
-}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java Wed May  8 17:59:52 2013
@@ -30,7 +30,7 @@ import org.apache.oozie.service.Services
 public class MessageFactory {
 
     public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json");
-    public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.message.serialize.";
+    public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.jms.serialize.";
 
     private static class MessageSerializerHolder {
         private static String messageSerializerInstance = Services

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java Wed May  8 17:59:52 2013
@@ -100,7 +100,7 @@ public interface ConnectionContext {
      * @return
      * @throws JMSException
      */
-    public ThreadLocal<Session> createThreadLocalSession(final int sessionOpts);
+    public Session createThreadLocalSession(final int sessionOpts) throws JMSException;
 
     /**
      * Closes the connection

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java Wed May  8 17:59:52 2013
@@ -38,7 +38,6 @@ public class DefaultConnectionContext im
     protected Connection connection;
     protected String connectionFactoryName;
     private static XLog LOG = XLog.getLog(ConnectionContext.class);
-    private int sessionOpts;
 
     @Override
     public void createConnection(Properties props) throws NamingException, JMSException {
@@ -58,7 +57,6 @@ public class DefaultConnectionContext im
                     LOG.error("Error in JMS connection", je);
                 }
             });
-
         }
         catch (JMSException e1) {
             LOG.error(e1.getMessage(), e1);
@@ -69,6 +67,9 @@ public class DefaultConnectionContext im
                 catch (Exception e2) {
                     LOG.error(e2.getMessage(), e2);
                 }
+                finally {
+                    connection = null;
+                }
             }
             throw e1;
         }
@@ -86,6 +87,9 @@ public class DefaultConnectionContext im
 
     @Override
     public Session createSession(int sessionOpts) throws JMSException {
+        if (connection == null) {
+            throw new JMSException ("Connection is not initialized");
+        }
         return connection.createSession(false, sessionOpts);
     }
 
@@ -105,32 +109,34 @@ public class DefaultConnectionContext im
 
     @Override
     public void close() {
-        try {
-            connection.close();
-        }
-        catch (JMSException e) {
-            LOG.warn("Unable to close the connection " + connection, e);
-        }
-    }
-
-    private final ThreadLocal<Session> th =  new ThreadLocal<Session>() {
-        protected Session initialValue() {
+        if (connection != null) {
             try {
-                return connection.createSession(false, sessionOpts);
+                connection.close();
             }
             catch (JMSException e) {
-                throw new RuntimeException("Session couldn't be created", e);
+                LOG.warn("Unable to close the connection " + connection, e);
+            }
+            finally {
+                connection = null;
             }
         }
-    };
+        th = null;
+    }
+
+    private ThreadLocal<Session> th = new ThreadLocal<Session>();
 
     @Override
-    public ThreadLocal<Session> createThreadLocalSession(final int sessionOpts) {
-       this.sessionOpts = sessionOpts;
-       return th;
+    public Session createThreadLocalSession(final int sessionOpts) throws JMSException {
+        Session session = th.get();
+        if (session != null) {
+            return session;
+        }
+        th.remove();
+        session = createSession(sessionOpts);
+        th.set(session);
+        return session;
     }
 
-
     @Override
     public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException {
         Topic topic = session.createTopic(topicName);

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java Wed May  8 17:59:52 2013
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.jms;
-
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.service.JMSTopicService;
-import org.apache.oozie.service.Services;
-
-/**
- * Default implementation to retrieve the JMS ConnectionInfo bean
- */
-public class DefaultJMSServerInfo implements JMSServerInfo {
-    private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
-
-    @Override
-    public JMSConnectionInfoBean getJMSConnectionInfoBean(String connectionProperties, String jobId) throws Exception {
-        JMSConnectionInfoBean jmsBean = new JMSConnectionInfoBean();
-        JMSConnectionInfo jmsInfo = new JMSConnectionInfo(connectionProperties);
-        jmsBean.setJNDIProperties(jmsInfo.getJNDIProperties());
-        if (jmsTopicService != null) {
-            jmsBean.setTopicName(jmsTopicService.getTopic(jobId));
-        }
-        else {
-            throw new Exception(
-                    "Topic name cannot be retrieved as JMSTopicService is not initialized. JMS notification"
-                            + "may not be enabled");
-        }
-        return jmsBean;
-    }
-
-}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java Wed May  8 17:59:52 2013
@@ -47,9 +47,8 @@ public class JMSExceptionListener implem
     public void onException(JMSException exception) {
         LOG.warn("Received JMSException for [{0}]", connInfo, exception);
         connCtxt.close();
-        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-        jmsService.removeConnInfo(connInfo);
         if (retry) {
+            JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
             jmsService.reestablishConnection(connInfo);
         }
     }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java Wed May  8 17:59:52 2013
@@ -20,6 +20,7 @@ package org.apache.oozie.jms;
 import java.util.Map;
 
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -50,7 +51,7 @@ public class JMSJobEventListener extends
     private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
     private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
     private JMSConnectionInfo connInfo;
-    public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.notification.connection";
+    public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.connection.properties";
     public static final String JMS_SESSION_OPTS = "oozie.jms.producer.session.opts";
     public static final String JMS_DELIVERY_MODE = "oozie.jms.delivery.mode";
     public static final String JMS_EXPIRATION_DATE = "oozie.jms.expiration.date";
@@ -75,27 +76,24 @@ public class JMSJobEventListener extends
 
     protected void sendMessage(Map<String, String> messageProperties, String messageBody, String topicName,
             String messageFormat) {
-        jmsContext = jmsService.createConnectionContext(connInfo, false);
-        ThreadLocal<Session> threadLocal = null;
-        try {
-            threadLocal = jmsContext.createThreadLocalSession(jmsSessionOpts);
-            Session session = threadLocal.get();
-            TextMessage textMessage = session.createTextMessage(messageBody);
-            for (Map.Entry<String, String> property : messageProperties.entrySet()) {
-                textMessage.setStringProperty(property.getKey(), property.getValue());
+        jmsContext = jmsService.createProducerConnectionContext(connInfo);
+        if (jmsContext != null) {
+            try {
+                Session session = jmsContext.createThreadLocalSession(jmsSessionOpts);
+                TextMessage textMessage = session.createTextMessage(messageBody);
+                for (Map.Entry<String, String> property : messageProperties.entrySet()) {
+                    textMessage.setStringProperty(property.getKey(), property.getValue());
+                }
+                textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
+                LOG.trace("Event related JMS message [{0}]", textMessage.toString());
+                MessageProducer producer = jmsContext.createProducer(session, topicName);
+                producer.setDeliveryMode(jmsDeliveryMode);
+                producer.setTimeToLive(jmsExpirationDate);
+                producer.send(textMessage);
+                producer.close();
             }
-            textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
-            LOG.trace("Event related JMS message [{0}]", textMessage.getText());
-            MessageProducer producer = jmsContext.createProducer(session, topicName);
-            producer.setDeliveryMode(jmsDeliveryMode);
-            producer.setTimeToLive(jmsExpirationDate);
-            producer.send(textMessage);
-            producer.close();
-        }
-        catch (Exception jmse) {
-            LOG.error("Exception happened while sending event related jms message", jmse);
-            if (threadLocal != null) {
-                threadLocal.remove();
+            catch (JMSException jmse) {
+                LOG.error("Exception happened while sending event related jms message", jmse);
             }
         }
 
@@ -104,17 +102,17 @@ public class JMSJobEventListener extends
     @Override
     public void onWorkflowJobEvent(WorkflowJobEvent event) {
         WorkflowJobMessage wfJobMessage = MessageFactory.createWorkflowJobMessage(event);
-        serializeAndSendJMSMessage(wfJobMessage, getTopic(event));
+        serializeJMSMessage(wfJobMessage, getTopic(event));
 
     }
 
     @Override
     public void onCoordinatorActionEvent(CoordinatorActionEvent event) {
         CoordinatorActionMessage coordActionMessage = MessageFactory.createCoordinatorActionMessage(event);
-        serializeAndSendJMSMessage(coordActionMessage, getTopic(event));
+        serializeJMSMessage(coordActionMessage, getTopic(event));
     }
 
-    private void serializeAndSendJMSMessage(JobMessage jobMessage, String topicName) {
+    private void serializeJMSMessage(JobMessage jobMessage, String topicName) {
         MessageSerializer serializer = MessageFactory.getMessageSerializer();
         String messageBody = serializer.getSerializedObject(jobMessage);
         sendMessage(jobMessage.getMessageProperties(), messageBody, topicName, serializer.getMessageFormat());
@@ -140,27 +138,18 @@ public class JMSJobEventListener extends
 
     @Override
     public void onWorkflowActionEvent(WorkflowActionEvent wae) {
-        LOG.warn("Sending jms message related to workflow action is not supported");
     }
 
     @Override
     public void onCoordinatorJobEvent(CoordinatorJobEvent wje) {
-        LOG.warn("Sending jms message related to coordinator job is not supported");
     }
 
     @Override
     public void onBundleJobEvent(BundleJobEvent wje) {
-        LOG.warn("Sending jms message related to bundle job is not supported");
     }
 
     @Override
     public void destroy() {
-        if (jmsContext != null) {
-            LOG.debug("Closing JMS connection");
-            jmsContext.close();
-            jmsService.removeConnInfo(connInfo);
-        }
-
-    }
+}
 
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java Wed May  8 17:59:52 2013
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.jms;
-
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-
-/**
- * Get the JMS ConnectionInfoBean
- */
-public interface JMSServerInfo {
-
-    /**
-     * Retrive the conn info bean using conn properties and job id
-     * @param connectionProperties the jms producer conn properties
-     * @param jobId the job id
-     * @return
-     * @throws Exception
-     */
-    public JMSConnectionInfoBean getJMSConnectionInfoBean(String connectionProperties, String jobId) throws Exception;
-
-}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Wed May  8 17:59:52 2013
@@ -54,7 +54,6 @@ public class JMSAccessorService implemen
     public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
     public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
     public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
-    public static final String JMS_PRODUCER_CONNECTION_INFO_IMPL = CONF_PREFIX + "jms.producer.connection.info.impl";
     private static XLog LOG;
 
     private Configuration conf;
@@ -62,6 +61,7 @@ public class JMSAccessorService implemen
     private int retryInitialDelay;
     private int retryMultiplier;
     private int retryMaxAttempts;
+    private ConnectionContext jmsProducerConnContext;
 
     /**
      * Map of JMS connection info to established JMS Connection
@@ -106,7 +106,7 @@ public class JMSAccessorService implemen
                 if (!topicsMap.containsKey(topic)) {
                     synchronized (topicsMap) {
                         if (!topicsMap.containsKey(topic)) {
-                            ConnectionContext connCtxt = createConnectionContext(connInfo, true);
+                            ConnectionContext connCtxt = createConnectionContext(connInfo);
                             if (connCtxt == null) {
                                 queueTopicForRetry(connInfo, topic, msgHandler);
                                 return;
@@ -255,13 +255,13 @@ public class JMSAccessorService implemen
         }
     }
 
-    public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo, boolean retry) {
+    public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
         ConnectionContext connCtxt = connectionMap.get(connInfo);
         if (connCtxt == null) {
             try {
                 connCtxt = getConnectionContextImpl();
                 connCtxt.createConnection(connInfo.getJNDIProperties());
-                connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, retry));
+                connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true));
                 connectionMap.put(connInfo, connCtxt);
                 LOG.info("Connection established to JMS Server for [{0}]", connInfo);
             }
@@ -273,6 +273,30 @@ public class JMSAccessorService implemen
         return connCtxt;
     }
 
+    public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) {
+        if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) {
+            return jmsProducerConnContext;
+        }
+        else {
+            synchronized (this) {
+                if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
+                    try {
+                        jmsProducerConnContext = getConnectionContextImpl();
+                        jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
+                        jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
+                                jmsProducerConnContext, false));
+                        LOG.info("Connection established to JMS Server for [{0}]", connInfo);
+                    }
+                    catch (Exception e) {
+                        LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
+                        return null;
+                    }
+                }
+            }
+        }
+        return jmsProducerConnContext;
+    }
+
     private ConnectionContext getConnectionContextImpl() {
         Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class);
         ConnectionContext connCtx = null;
@@ -297,23 +321,15 @@ public class JMSAccessorService implemen
     @Override
     public void destroy() {
         LOG.info("Destroying JMSAccessor service ");
-        LOG.info("Closing JMS sessions");
-        for (Map<String, MessageReceiver> topicsMap : receiversMap.values()) {
-            for (MessageReceiver receiver : topicsMap.values()) {
-                try {
-                    receiver.getSession().close();
-                }
-                catch (JMSException e) {
-                    LOG.warn("Unable to close session " + receiver.getSession(), e);
-                }
-            }
-        }
         receiversMap.clear();
 
         LOG.info("Closing JMS connections");
         for (ConnectionContext conn : connectionMap.values()) {
             conn.close();
         }
+        if (jmsProducerConnContext != null) {
+            jmsProducerConnContext.close();
+        }
         connectionMap.clear();
     }
 
@@ -322,16 +338,13 @@ public class JMSAccessorService implemen
         return JMSAccessorService.class;
     }
 
-    public void removeConnInfo(JMSConnectionInfo connInfo){
-        connectionMap.remove(connInfo);
-    }
-
     /**
      * Reestablish connection for the given JMS connect information
      * @param connInfo JMS connection info
      */
     public void reestablishConnection(JMSConnectionInfo connInfo) {
         // Queue the connection and topics for retry
+        connectionMap.remove(connInfo);
         ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
         Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
         if (listeningTopicsMap != null) {
@@ -339,12 +352,6 @@ public class JMSAccessorService implemen
             for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
                 MessageReceiver receiver = topicEntry.getValue();
                 retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
-                try {
-                    receiver.getSession().close();
-                }
-                catch (JMSException e) {
-                    LOG.warn("Unable to close session " + receiver.getSession(), e);
-                }
             }
         }
     }
@@ -366,7 +373,7 @@ public class JMSAccessorService implemen
         LOG.info("Attempting retry of connection [{0}]", connInfo);
         connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
         connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
-        ConnectionContext connCtxt = createConnectionContext(connInfo, true);
+        ConnectionContext connCtxt = createConnectionContext(connInfo);
         boolean shouldRetry = false;
         if (connCtxt == null) {
             shouldRetry = true;

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java Wed May  8 17:59:52 2013
@@ -21,9 +21,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.Event.AppType;
 import org.apache.oozie.event.CoordinatorActionEvent;
 import org.apache.oozie.event.WorkflowJobEvent;
 import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
@@ -32,6 +35,7 @@ import org.apache.oozie.executor.jpa.JPA
 import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
 import org.apache.oozie.util.XLog;
 
+
 /**
  * JMS Topic service to retrieve topic names from events or job id
  *
@@ -252,6 +256,30 @@ public class JMSTopicService implements 
         return topicName;
     }
 
+    public Properties getTopicPatternProperties() {
+        Properties props = new Properties();
+        String wfTopic = topicMap.get(JobType.WORKFLOW.value);
+        wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
+        props.put(AppType.WORKFLOW_JOB, wfTopic);
+        props.put(AppType.WORKFLOW_ACTION, wfTopic);
+
+        String coordTopic = topicMap.get(JobType.COORDINATOR.value);
+        coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
+        props.put(AppType.COORDINATOR_JOB, coordTopic);
+        props.put(AppType.COORDINATOR_ACTION, coordTopic);
+
+        String bundleTopic = topicMap.get(JobType.BUNDLE.value);
+        bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
+        props.put(AppType.BUNDLE_JOB, bundleTopic);
+        props.put(AppType.BUNDLE_ACTION, bundleTopic);
+
+        return props;
+    }
+
+    public String getTopicPrefix() {
+        return topicPrefix;
+    }
+
     @Override
     public void destroy() {
         topicMap.clear();

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java Wed May  8 17:59:52 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.oozie.BuildInfo;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.AuthorizationException;
@@ -74,6 +75,18 @@ public abstract class BaseAdminServlet e
         }*/
     }
 
+
+    /**
+     * Get JMS connection Info
+     * @param request
+     * @param response
+     * @throws XServletException
+     * @throws IOException
+     */
+    abstract JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
+            throws XServletException, IOException;
+
+
     /**
      * Return safemode state, instrumentation, configuration, osEnv or
      * javaSysProps
@@ -123,8 +136,16 @@ public abstract class BaseAdminServlet e
             json.put(JsonTags.AVAILABLE_TIME_ZONES, availableTimeZonesToJsonArray());
             sendJsonResponse(response, HttpServletResponse.SC_OK, json);
         }
+        else if (resource.equals(RestConstants.ADMIN_JMS_INFO)) {
+            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null ? "GMT" : request
+                    .getParameter(RestConstants.TIME_ZONE_PARAM);
+            JsonBean jmsBean = getJMSConnectionInfo(request, response);
+            sendJsonResponse(response, HttpServletResponse.SC_OK, jmsBean, timeZoneId);
+        }
+
     }
 
+
     @Override
     protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
             IOException {
@@ -182,7 +203,7 @@ public abstract class BaseAdminServlet e
             throws XServletException;
 
     protected abstract void getQueueDump(JSONObject json) throws XServletException;
-    
+
     private static final JSONArray GMTOffsetTimeZones = new JSONArray();
     static {
         prepareGMTOffsetTimeZones();

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java Wed May  8 17:59:52 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.AuthorizationException;
 import org.apache.oozie.service.AuthorizationService;
@@ -236,11 +237,13 @@ public abstract class BaseJobServlet ext
             sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
         }
 
-        else if (show.equals(RestConstants.JOB_SHOW_JMS_INFO)){
+        else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) {
             stopCron();
-            JsonBean jmsBean = getJMSConnectionInfo (request, response);
+            String jmsTopicName = getJMSTopicName(request, response);
+            JSONObject json = new JSONObject();
+            json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName);
             startCron();
-            sendJsonResponse(response, HttpServletResponse.SC_OK, jmsBean, timeZoneId);
+            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
         }
 
         else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
@@ -380,14 +383,13 @@ public abstract class BaseJobServlet ext
     abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
             throws XServletException, IOException;
 
-
     /**
-     * abstract method to get JMS connection details for a job
+     * abstract method to get JMS topic name for a job
      * @param request
      * @param response
      * @throws XServletException
      * @throws IOException
      */
-    abstract JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
+    abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response)
             throws XServletException, IOException;
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java Wed May  8 17:59:52 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.servlet;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -25,11 +26,13 @@ import javax.servlet.http.HttpServletRes
 
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.Services;
 import org.json.simple.JSONObject;
 
+@SuppressWarnings("unchecked")
 public class V0AdminServlet extends BaseAdminServlet {
     private static final long serialVersionUID = 1L;
     private static final String INSTRUMENTATION_NAME = "v0admin";
@@ -109,4 +112,10 @@ public class V0AdminServlet extends Base
     protected void getQueueDump(JSONObject json) throws XServletException {
         throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0301, "cannot get queue dump");
     }
+
+    @Override
+    protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+    }
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java Wed May  8 17:59:52 2013
@@ -211,7 +211,8 @@ public class V0JobServlet extends BaseJo
     }
 
     @Override
-    protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
-            throws XServletException, IOException {
-        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);}
+    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
+    }
 }
\ No newline at end of file

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java Wed May  8 17:59:52 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,18 +17,26 @@
  */
 package org.apache.oozie.servlet;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
 import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.JMSTopicService;
 import org.apache.oozie.service.Services;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
@@ -37,7 +45,7 @@ public class V1AdminServlet extends Base
 
     private static final long serialVersionUID = 1L;
     private static final String INSTRUMENTATION_NAME = "v1admin";
-    private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[8];
+    private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[9];
 
     static {
         RESOURCES_INFO[0] = new ResourceInfo(RestConstants.ADMIN_STATUS_RESOURCE, Arrays.asList("PUT", "GET"),
@@ -57,13 +65,20 @@ public class V1AdminServlet extends Base
                 Collections.EMPTY_LIST);
         RESOURCES_INFO[7] = new ResourceInfo((RestConstants.ADMIN_TIME_ZONES_RESOURCE), Arrays.asList("GET"),
                 Collections.EMPTY_LIST);
+        RESOURCES_INFO[8] = new ResourceInfo((RestConstants.ADMIN_JMS_INFO), Arrays.asList("GET"),
+                Collections.EMPTY_LIST);
+
     }
 
-    public V1AdminServlet() {
-        super(INSTRUMENTATION_NAME, RESOURCES_INFO);
+    protected V1AdminServlet(String name) {
+        super(name, RESOURCES_INFO);
         modeTag = RestConstants.ADMIN_SYSTEM_MODE_PARAM;
     }
 
+    public V1AdminServlet() {
+        this(INSTRUMENTATION_NAME);
+    }
+
     /*
      * (non-Javadoc)
      *
@@ -131,4 +146,11 @@ public class V1AdminServlet extends Base
         json.put(JsonTags.UNIQUE_MAP_DUMP, uniqueDumpArray);
     }
 
+    @Override
+    protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+    }
+
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java Wed May  8 17:59:52 2013
@@ -737,20 +737,6 @@ public class V1JobServlet extends BaseJo
         return consoleBase;
     }
 
-    protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response) throws XServletException{
-        JsonBean jmsBean = null;
-        String jobId = getResourceName(request);
-        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
-                getAuthToken(request));
-        try {
-            jmsBean = (JsonBean) dagEngine.getJMSConnectionInfo(jobId);
-        }
-        catch (DagEngineException ex) {
-            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
-        }
-        return jmsBean;
-    }
-
     /**
      * Get wf action info
      *
@@ -1009,4 +995,11 @@ public class V1JobServlet extends BaseJo
             throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
         }
     }
+
+    @Override
+    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+    }
+
 }

Added: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java?rev=1480380&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java Wed May  8 17:59:52 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.servlet;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
+
+/**
+ * V2 admin servlet
+ *
+ */
+public class V2AdminServlet extends V1AdminServlet {
+
+    private static final long serialVersionUID = 1L;
+    private static final String INSTRUMENTATION_NAME = "v2admin";
+
+    public V2AdminServlet() {
+        super(INSTRUMENTATION_NAME);
+    }
+
+    @Override
+    protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
+            throws XServletException, IOException {
+        Configuration conf = Services.get().getConf();
+        JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+        String connectionProperties = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+        if (connectionProperties == null) {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E1601,
+                    "JMS connection property is not defined");
+        }
+        JMSConnectionInfoBean jmsBean = new JMSConnectionInfoBean();
+        JMSConnectionInfo jmsInfo = new JMSConnectionInfo(connectionProperties);
+        Properties jmsInfoProps = jmsInfo.getJNDIProperties();
+        jmsInfoProps.remove("java.naming.security.principal");
+        jmsBean.setJNDIProperties(jmsInfoProps);
+        if (jmsTopicService != null) {
+            jmsBean.setTopicPrefix(jmsTopicService.getTopicPrefix());
+            jmsBean.setTopicPatternProperties(jmsTopicService.getTopicPatternProperties());
+        }
+        else {
+            throw new XServletException(
+                    HttpServletResponse.SC_BAD_REQUEST,
+                    ErrorCode.E1601,
+                    "JMSTopicService is not initialized. JMS notification"
+                            + "may not be enabled");
+        }
+        return jmsBean;
+    }
+
+}