You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/05/08 19:59:53 UTC

svn commit: r1480380 [2/2] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/cli/ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/event/ client/src/main/java/org/apache/oozie/client/event/jms/ client...

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java Wed May  8 17:59:52 2013
@@ -17,10 +17,16 @@
  */
 package org.apache.oozie.servlet;
 
+import java.io.IOException;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.oozie.DagEngine;
+import org.apache.oozie.DagEngineException;
 import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.service.DagEngineService;
+import org.apache.oozie.service.Services;
 
 @SuppressWarnings("serial")
 public class V2JobServlet extends V1JobServlet {
@@ -42,4 +48,21 @@ public class V2JobServlet extends V1JobS
         JsonBean actionBean = super.getWorkflowActionBean(request, response);
         return actionBean;
     }
+
+
+    @Override
+    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+            IOException {
+        String topicName;
+        String jobId = getResourceName(request);
+        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
+                getAuthToken(request));
+        try {
+            topicName = dagEngine.getJMSTopicName(jobId);
+        }
+        catch (DagEngineException ex) {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+        }
+        return topicName;
+    }
 }

Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Wed May  8 17:59:52 2013
@@ -164,14 +164,14 @@
         bundle job and bundle action
         WORKFLOW=workflow,
         COORDINATOR=coordinator,
-        BUNDLE={jobId}
+        BUNDLE=${jobId}
         For jobs with no defined topic, default topic will be ${username}
         </description>
     </property>
 
     <!-- JMS Producer connection -->
     <property>
-        <name>oozie.jms.producer.notification.connection</name>
+        <name>oozie.jms.producer.connection.properties</name>
         <value>java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616;connectionFactoryNames#ConnectionFactory</value>
     </property>
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java Wed May  8 17:59:52 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,6 +34,7 @@ import org.apache.oozie.servlet.MockDagE
 import org.apache.oozie.servlet.V1AdminServlet;
 import org.apache.oozie.servlet.V1JobServlet;
 import org.apache.oozie.servlet.V1JobsServlet;
+import org.apache.oozie.servlet.V2AdminServlet;
 import org.apache.oozie.servlet.V2JobServlet;
 import org.apache.oozie.util.XConfiguration;
 
@@ -45,15 +46,15 @@ public class TestOozieCLI extends DagSer
         new V1JobServlet();
         new V1JobsServlet();
         new V1AdminServlet();
+        new V2AdminServlet();
         new V2JobServlet();
     }
 
     static final boolean IS_SECURITY_ENABLED = false;
     static final String VERSION = "/v" + OozieClient.WS_PROTOCOL_VERSION;
     static final String[] END_POINTS = {"/versions", VERSION + "/jobs", VERSION + "/job/*", VERSION + "/admin/*"};
-    static final Class[] SERVLET_CLASSES =
- { HeaderTestingVersionServlet.class, V1JobsServlet.class, V1JobServlet.class,
-            V1AdminServlet.class, V2JobServlet.class };
+    static final Class[] SERVLET_CLASSES = { HeaderTestingVersionServlet.class, V1JobsServlet.class,
+            V1JobServlet.class, V1AdminServlet.class, V2JobServlet.class, V2AdminServlet.class };
 
     @Override
     protected void setUp() throws Exception {
@@ -300,7 +301,7 @@ public class TestOozieCLI extends DagSer
 
     /**
      * Check if "-debug" option is accepted at CLI with job run command
-     * 
+     *
      * @throws Exception
      */
     public void testRunWithDebug() throws Exception {
@@ -518,12 +519,12 @@ public class TestOozieCLI extends DagSer
                     MockDagEngineService.JOB_ID + "1" + MockDagEngineService.JOB_ID_END};
                 assertEquals(0, new OozieCLI().run(args));
                 assertEquals(RestConstants.JOB_SHOW_INFO, MockDagEngineService.did);
-                
+
                 args = new String[]{"job", "-oozie", oozieUrl, "-info", MockDagEngineService.JOB_ID + "2" +
                     MockDagEngineService.JOB_ID_END};
                 assertEquals(0, new OozieCLI().run(args));
                 assertEquals(RestConstants.JOB_SHOW_INFO, MockDagEngineService.did);
-                
+
                 args = new String[]{"job", "-oozie", oozieUrl, "-info",
                         MockDagEngineService.JOB_ID + (MockDagEngineService.workflows.size() + 1)};
                 assertEquals(-1, new OozieCLI().run(args));
@@ -545,12 +546,12 @@ public class TestOozieCLI extends DagSer
                         "name=x"};
                 assertEquals(0, new OozieCLI().run(args));
                 assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did);
-                
+
                 args = new String[]{"jobs", "-timezone", "PST", "-len", "3", "-offset", "2", "-oozie", oozieUrl,
                     "-filter", "name=x"};
                 assertEquals(0, new OozieCLI().run(args));
                 assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did);
-                
+
                 args = new String[]{"jobs", "-jobtype", "coord",  "-filter", "status=FAILED", "-oozie", oozieUrl};
                 assertEquals(0, new OozieCLI().run(args));
                 assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did);
@@ -711,7 +712,7 @@ public class TestOozieCLI extends DagSer
             }
         });
     }
-    
+
     public void testInfo() throws Exception {
         String[] args = new String[]{"info"};
         assertEquals(0, new OozieCLI().run(args));

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java Wed May  8 17:59:52 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,6 +32,7 @@ import org.apache.oozie.servlet.V0JobsSe
 import org.apache.oozie.servlet.V1AdminServlet;
 import org.apache.oozie.servlet.V1JobServlet;
 import org.apache.oozie.servlet.V1JobsServlet;
+import org.apache.oozie.servlet.V2AdminServlet;
 import org.apache.oozie.servlet.V2JobServlet;
 
 public class TestWorkflowClient extends DagServletTestCase {
@@ -44,13 +45,15 @@ public class TestWorkflowClient extends 
         new V1AdminServlet();
         new V1JobServlet();
         new V2JobServlet();
+        new V2AdminServlet();
     }
 
     private static final boolean IS_SECURITY_ENABLED = false;
     static final String VERSION = "/v" + OozieClient.WS_PROTOCOL_VERSION;
     static final String[] END_POINTS = {"/versions", VERSION + "/jobs", VERSION + "/job/*", VERSION + "/admin/*"};
-    static final Class[] SERVLET_CLASSES = {HeaderTestingVersionServlet.class, V0JobsServlet.class,
-            V0JobServlet.class, V1AdminServlet.class, V1JobServlet.class, V2JobServlet.class, V1JobsServlet.class};
+    static final Class[] SERVLET_CLASSES = { HeaderTestingVersionServlet.class, V0JobsServlet.class,
+            V0JobServlet.class, V1AdminServlet.class, V2AdminServlet.class, V1JobServlet.class, V2JobServlet.class,
+            V1JobsServlet.class };
 
     protected void setUp() throws Exception {
         super.setUp();

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java Wed May  8 17:59:52 2013
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import java.util.Properties;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.jms.JMSJobEventListener;
-import org.apache.oozie.service.JMSAccessorService;
-import org.apache.oozie.service.JMSTopicService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.workflow.WorkflowInstance;
-import org.junit.Test;
-
-public class TestJMSInfoXCommand extends XDataTestCase {
-
-    private Services services;
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        services.init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    @Test
-    public void testConnectionNotEnabled() {
-        try {
-            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-            new JMSInfoXCommand(wfj.getId()).call();
-        }
-        catch (CommandException e) {
-            assertEquals(ErrorCode.E1601, e.getErrorCode());
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
-    }
-
-    @Test
-    public void testJMSConnectionInfo() {
-        try {
-            services.destroy();
-            services = new Services();
-            services.getConf().set(
-                    JMSJobEventListener.JMS_CONNECTION_PROPERTIES,
-                    "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#"
-                            + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory");
-            services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
-            services.init();
-
-            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-            JMSInfoXCommand jmsInfoCmd = new JMSInfoXCommand(wfj.getId());
-            JMSConnectionInfoBean jmsBean = jmsInfoCmd.call();
-            assertEquals("test", jmsBean.getTopicName());
-            Properties props = jmsBean.getJNDIProperties();
-            assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial"));
-            assertEquals(localActiveMQBroker, props.get("java.naming.provider.url"));
-            assertEquals("ConnectionFactory", props.get("connectionFactoryNames"));
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
-    }
-
-}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java Wed May  8 17:59:52 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.jms;
 
+import javax.jms.JMSException;
 import javax.jms.Session;
 
 import org.apache.hadoop.conf.Configuration;
@@ -54,11 +55,11 @@ public class TestDefaultConnectionContex
     }
 
     @Test
-    public void testThreadLocalSession() {
+    public void testThreadLocalSession() throws JMSException {
         String jmsProps = services.getConf().get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
         JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
         ConnectionContext jmsContext = Services.get().get(JMSAccessorService.class)
-                .createConnectionContext(connInfo, false);
+                .createConnectionContext(connInfo);
         Thread th = new Thread(new SessionThread(jmsContext));
         th.start();
         try {
@@ -69,25 +70,17 @@ public class TestDefaultConnectionContex
         }
         assertEquals(session1, session2);
 
-        ThreadLocal<Session> threadLocal1 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
-        Session session3 = threadLocal1.get();
-        ThreadLocal<Session> threadLocal2 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
-        Session session4 = threadLocal2.get();
+        Session session3 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
+        Session session4 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
         // As session3 and session4 are created by same threads, they should be
         // equal
         assertTrue(session3.equals(session4));
         // As session1 and session3 are created by diff threads, they shoudn't
         // be equal
         assertFalse(session1.equals(session3));
-        threadLocal1.remove();
-        ThreadLocal<Session> threadLocal3 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
-        Session session5 = threadLocal3.get();
-        assertFalse(session3.equals(session5));
-
     }
 
     class SessionThread implements Runnable {
-
         private ConnectionContext connContext;
 
         SessionThread(ConnectionContext connContext) {
@@ -96,10 +89,14 @@ public class TestDefaultConnectionContex
 
         @Override
         public void run() {
-            ThreadLocal<Session> th1 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
-            session1 = th1.get();
-            ThreadLocal<Session> th2 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
-            session2 = th2.get();
+            try {
+                session1 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
+                session2 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
+            }
+            catch (JMSException e) {
+                e.printStackTrace();
+            }
+
         }
 
     }

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java Wed May  8 17:59:52 2013
@@ -89,7 +89,7 @@ public class TestJMSJobEventListener ext
             wfEventListener.onWorkflowJobEvent(wfe);
             TextMessage message = (TextMessage) consumer.receive(5000);
             assertFalse(message.getText().contains("endTime"));
-            WorkflowJobMessage wfStartMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
             assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
             assertEquals(startDate, wfStartMessage.getStartTime());
             assertEquals("wfId1", wfStartMessage.getId());
@@ -122,7 +122,7 @@ public class TestJMSJobEventListener ext
             MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
             wfEventListener.onWorkflowJobEvent(wfe);
             TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfSuccMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
             assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
             assertEquals(startDate, wfSuccMessage.getStartTime());
             assertEquals(endDate, wfSuccMessage.getEndTime());
@@ -158,7 +158,7 @@ public class TestJMSJobEventListener ext
             MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
             wfEventListener.onWorkflowJobEvent(wfe);
             TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
             assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
             assertEquals(startDate, wfFailMessage.getStartTime());
             assertEquals(endDate, wfFailMessage.getEndTime());
@@ -193,7 +193,7 @@ public class TestJMSJobEventListener ext
             wfEventListener.onWorkflowJobEvent(wfe);
             TextMessage message = (TextMessage) consumer.receive(5000);
             assertFalse(message.getText().contains("endTime"));
-            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
             assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
             assertEquals(startDate, wfFailMessage.getStartTime());
             assertEquals("wfId1", wfFailMessage.getId());
@@ -226,7 +226,7 @@ public class TestJMSJobEventListener ext
             MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
             wfEventListener.onWorkflowJobEvent(wfe);
             TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
             Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
             assertEquals("user_1", wfFailMessage.getUser());
             assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
@@ -276,7 +276,7 @@ public class TestJMSJobEventListener ext
             MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
             wfEventListener.onWorkflowJobEvent(wfe);
             TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
             Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
             assertEquals("user1", wfFailMessage.getUser());
             assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
@@ -303,7 +303,7 @@ public class TestJMSJobEventListener ext
             MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
             wfEventListener.onWorkflowJobEvent(wfe);
             TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
             Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
             assertEquals("user1", wfFailMessage.getUser());
             assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
@@ -339,8 +339,7 @@ public class TestJMSJobEventListener ext
             assertNotNull(jmsContext);
             broker.stop();
             jmsContext = getConnectionContext();
-            // Exception Listener should have removed the conn context from
-            // connection map
+            // Exception Listener should have removed the old conn context
             assertNull(jmsContext);
             broker = new BrokerService();
             broker.addConnector(brokerURl);
@@ -381,7 +380,7 @@ public class TestJMSJobEventListener ext
             assertFalse(message.getText().contains("endTime"));
             assertFalse(message.getText().contains("errorCode"));
             assertFalse(message.getText().contains("errorMessage"));
-            CoordinatorActionMessage coordActionWaitingMessage = (CoordinatorActionMessage) JMSMessagingUtils
+            CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
                     .getEventMessage(message);
             assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
             assertEquals(startDate, coordActionWaitingMessage.getStartTime());
@@ -419,7 +418,7 @@ public class TestJMSJobEventListener ext
             assertFalse(message.getText().contains("errorCode"));
             assertFalse(message.getText().contains("errorMessage"));
             assertFalse(message.getText().contains("missingDependency"));
-            CoordinatorActionMessage coordActionStartMessage = (CoordinatorActionMessage) JMSMessagingUtils
+            CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
                     .getEventMessage(message);
             assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
             assertEquals(startDate, coordActionStartMessage.getStartTime());
@@ -456,7 +455,7 @@ public class TestJMSJobEventListener ext
             assertFalse(message.getText().contains("errorCode"));
             assertFalse(message.getText().contains("errorMessage"));
             assertFalse(message.getText().contains("missingDependency"));
-            CoordinatorActionMessage coordActionSuccessMessage = (CoordinatorActionMessage) JMSMessagingUtils
+            CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
                     .getEventMessage(message);
             assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
             assertEquals(startDate, coordActionSuccessMessage.getStartTime());
@@ -494,7 +493,7 @@ public class TestJMSJobEventListener ext
             coordEventListener.onCoordinatorActionEvent(cae);
             TextMessage message = (TextMessage) consumer.receive(5000);
             assertFalse(message.getText().contains("missingDependency"));
-            CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+            CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
                     .getEventMessage(message);
             assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
             assertEquals(startDate, coordActionFailMessage.getStartTime());
@@ -530,7 +529,7 @@ public class TestJMSJobEventListener ext
             MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
             coordEventListener.onCoordinatorActionEvent(cae);
             TextMessage message = (TextMessage) consumer.receive(5000);
-            CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+            CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
                     .getEventMessage(message);
             Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
             assertEquals("user1", coordActionFailMessage.getUser());
@@ -571,7 +570,7 @@ public class TestJMSJobEventListener ext
         String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
         JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
         JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-        ConnectionContext jmsContext = jmsService.createConnectionContext(connInfo, false);
+        ConnectionContext jmsContext = jmsService.createProducerConnectionContext(connInfo);
         return jmsContext;
 
     }

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java Wed May  8 17:59:52 2013
@@ -1,87 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.jms;
-
-import java.util.Properties;
-
-import org.apache.oozie.BundleActionBean;
-import org.apache.oozie.BundleJobBean;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.service.JMSAccessorService;
-import org.apache.oozie.service.JMSTopicService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.workflow.WorkflowInstance;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestJMSServerInfo extends XDataTestCase {
-
-    private Services services;
-
-    @Before
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        services.getConf().set(
-                JMSJobEventListener.JMS_CONNECTION_PROPERTIES,
-                "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#"
-                        + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory");
-        services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
-        services.init();
-    }
-
-    @After
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    @Test
-    public void testJMSConnectionInfo() {
-        try {
-            JMSServerInfo jmsServerInfo = new DefaultJMSServerInfo();
-            String connectionProperties = Services.get().getConf()
-                    .get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
-            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-            JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, wfj.getId());
-            assertEquals(wfj.getUser(), jmsBean.getTopicName());
-            Properties props = jmsBean.getJNDIProperties();
-            assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial"));
-            assertEquals(localActiveMQBroker, props.get("java.naming.provider.url"));
-            assertEquals("ConnectionFactory", props.get("connectionFactoryNames"));
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
-    }
-
-}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Wed May  8 17:59:52 2013
@@ -56,10 +56,10 @@ public class TestJMSAccessorService exte
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
         // both servers should connect to default JMS server
         JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
-        ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo, true);
+        ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo);
         assertTrue(ctxt1.isConnectionInitialized());
         JMSConnectionInfo connInfo1 = hcatService.getJMSConnectionInfo(new URI("http://unknown:80"));
-        ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1, true);
+        ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1);
         assertTrue(ctxt2.isConnectionInitialized());
         assertEquals(ctxt1, ctxt2);
         ctxt1.close();
@@ -213,13 +213,13 @@ public class TestJMSAccessorService exte
         assertTrue(jmsService.isListeningToTopic(connInfo, topic));
         assertFalse(jmsService.isConnectionInRetryList(connInfo));
         assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo, true);
+        ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
         broker.stop();
         try {
             connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
             fail("Exception expected");
         }
-        catch (JMSException e) {
+        catch (Exception e) {
             Thread.sleep(100);
             assertFalse(jmsService.isListeningToTopic(connInfo, topic));
             assertTrue(jmsService.isConnectionInRetryList(connInfo));

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java Wed May  8 17:59:52 2013
@@ -18,6 +18,8 @@
 
 package org.apache.oozie.service;
 
+import java.util.Properties;
+
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorActionBean;
@@ -29,6 +31,8 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.service.JMSTopicService.JobType;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.workflow.WorkflowInstance;
@@ -87,24 +91,27 @@ public class TestJMSTopicService extends
     @Test
     public void testTopicAsJobId() {
         try {
+            final String TOPIC_PREFIX = "oozie.";
             services.destroy();
             services = setupServicesForTopic();
             services.getConf().set(JMSTopicService.TOPIC_NAME, "default=" + JMSTopicService.TopicType.JOBID.getValue());
+            services.getConf().set(JMSTopicService.TOPIC_PREFIX, TOPIC_PREFIX);
             services.init();
             JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
             WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
-            assertEquals(wfj.getId(), jmsTopicService.getTopic(wfj.getId()));
+            assertEquals(TOPIC_PREFIX, jmsTopicService.getTopicPrefix());
+            assertEquals(TOPIC_PREFIX+wfj.getId(), jmsTopicService.getTopic(wfj.getId()));
             WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
-            assertEquals(wfj.getId(), jmsTopicService.getTopic(wab.getId()));
+            assertEquals(TOPIC_PREFIX+wfj.getId(), jmsTopicService.getTopic(wab.getId()));
             CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
-            assertEquals(cjb.getId(), jmsTopicService.getTopic(cjb.getId()));
+            assertEquals(TOPIC_PREFIX+cjb.getId(), jmsTopicService.getTopic(cjb.getId()));
             CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                     "coord-action-for-action-input-check.xml", 0);
-            assertEquals(cjb.getId(), jmsTopicService.getTopic(cab.getId()));
+            assertEquals(TOPIC_PREFIX+cjb.getId(), jmsTopicService.getTopic(cab.getId()));
             BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
-            assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+            assertEquals(TOPIC_PREFIX+bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
             BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
-            assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+            assertEquals(TOPIC_PREFIX+bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
         }
         catch (Exception e) {
             e.printStackTrace();
@@ -241,4 +248,56 @@ public class TestJMSTopicService extends
         }
     }
 
+    @Test
+    public void testTopicProperties1() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.init();
+            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+            Properties props = jmsTopicService.getTopicPatternProperties();
+            assertEquals("${username}", props.get(AppType.WORKFLOW_JOB));
+            assertEquals("${username}", props.get(AppType.WORKFLOW_ACTION));
+            assertEquals("${username}", props.get(AppType.COORDINATOR_JOB));
+            assertEquals("${username}", props.get(AppType.COORDINATOR_ACTION));
+            assertEquals("${username}", props.get(AppType.BUNDLE_JOB));
+            assertEquals("${username}", props.get(AppType.BUNDLE_ACTION));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+
+    @Test
+    public void testTopicProperties2() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.getConf().set(
+                    JMSTopicService.TOPIC_NAME,
+                    JMSTopicService.JobType.WORKFLOW.getValue() + " = workflow,"
+                            + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord");
+            services.init();
+            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+            Properties props = jmsTopicService.getTopicPatternProperties();
+            assertEquals("workflow", props.get(AppType.WORKFLOW_JOB));
+            assertEquals("workflow", props.get(AppType.WORKFLOW_ACTION));
+
+            assertEquals("coord", props.get(AppType.COORDINATOR_JOB));
+            assertEquals("coord", props.get(AppType.COORDINATOR_ACTION));
+
+            assertEquals("${username}", props.get(AppType.BUNDLE_JOB));
+            assertEquals("${username}", props.get(AppType.BUNDLE_ACTION));
+
+            services.destroy();
+
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
 }

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java Wed May  8 17:59:52 2013
@@ -178,12 +178,6 @@ public class MockDagEngineService extend
         }
 
         @Override
-        public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
-            did = RestConstants.JOB_SHOW_JMS_INFO;
-            return createDummyJMSConnectionInfo();
-        }
-
-        @Override
         public String getDefinition(String jobId) throws DagEngineException {
             did = RestConstants.JOB_SHOW_DEFINITION;
             int idx = validateWorkflowIdx(jobId);
@@ -241,7 +235,7 @@ public class MockDagEngineService extend
         jmsProps.setProperty("k1", "v1");
         jmsProps.setProperty("k2", "v2");
         jmsBean.setJNDIProperties(jmsProps);
-        jmsBean.setTopicName("topic");
+        jmsBean.setTopicPrefix("topicPrefix");
         return jmsBean;
     }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java Wed May  8 17:59:52 2013
@@ -337,22 +337,4 @@ public class TestV1JobServlet extends Da
             }
         });
     }
-
-    public void testJMSInfo() throws Exception {
-        runTest("/v1/job/*", V1JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                MockDagEngineService.reset();
-                Map<String, String> params = new HashMap<String, String>();
-                params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_JMS_INFO);
-                URL url = createURL(MockDagEngineService.JOB_ID + 1 + MockDagEngineService.JOB_ID_END, params);
-                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-                conn.setRequestMethod("GET");
-                assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
-                assertEquals(RestConstants.JOB_SHOW_JMS_INFO, MockDagEngineService.did);
-                return null;
-            }
-        });
-    }
-
 }

Modified: oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki Wed May  8 17:59:52 2013
@@ -910,14 +910,32 @@ Assuming Oozie is runing at =OOZIE_URL=,
    * <OOZIE_URL>/v2/job
    * <OOZIE_URL>/v2/jobs
 
-Please note that v1 and v2 are almost identical.
-Only difference is the JSON format of Job Information API (*/job) particularly for map-reduce action.
-No change for other actions.
+*Changes in v2 job API:*
 
+There is a difference in the JSON format of Job Information API (*/job) particularly for map-reduce action.
+No change for other actions.
 In v1, externalId and consoleUrl point to spawned child job ID, and exteranlChildIDs is null in map-reduce action.
 In v2, externalId and consoleUrl point to launcher job ID, and exteranlChildIDs is spawned child job ID in map-reduce action.
 
-v2/admin, v2/jobs remain the same with v1/admin, v1/jobs
+v2 supports retrieving of JMS topic on which job notifications are sent
+
+*REST API URL:*
+
+<verbatim>
+GET http://localhost:11000/oozie/v2/job/0000002-130507145349661-oozie-vira-W?show=jmstopic
+</verbatim>
+
+*Changes in v2 admin API:*
+
+v2 adds support for retrieving JMS connection information related to JMS notifications.
+
+*REST API URL:*
+
+<verbatim>
+GET http://localhost:11000/oozie/v2/admin/jmsinfo
+</verbatim>
+
+v2/jobs remain the same as v1/jobs
 
 ---+++ Job and Jobs End-Points
 

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed May  8 17:59:52 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1347 Additions to JMS topic API (virag)
 OOZIE-1231 Provide access to launcher job URL from web console when using Map Reduce action (ryota via virag)
 OOZIE-1335 The launcher job should use uber mode in Hadoop 2 by default (rkanter)
 OOZIE-1297 Add chgrp in FS action (ryota via virag)

Modified: oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml (original)
+++ oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml Wed May  8 17:59:52 2013
@@ -49,6 +49,13 @@
     </servlet>
 
     <servlet>
+        <servlet-name>v2admin</servlet-name>
+        <display-name>Oozie admin</display-name>
+        <servlet-class>org.apache.oozie.servlet.V2AdminServlet</servlet-class>
+        <load-on-startup>1</load-on-startup>
+    </servlet>
+
+    <servlet>
         <servlet-name>callback</servlet-name>
         <display-name>Callback Notification</display-name>
         <servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class>
@@ -114,7 +121,7 @@
     </servlet-mapping>
 
     <servlet-mapping>
-        <servlet-name>v1admin</servlet-name>
+        <servlet-name>v2admin</servlet-name>
         <url-pattern>/v2/admin/*</url-pattern>
     </servlet-mapping>