You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/04/18 01:55:21 UTC

svn commit: r1469108 [3/3] - in /oozie/trunk: ./ client/ client/src/main/java/org/apache/oozie/cli/ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/event/jms/ client/src/main/java/org/apache/oozie/client/event...

Added: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.text.ParseException;
+import java.util.Date;
+import java.util.Random;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.jms.JMSMessagingUtils;
+import org.apache.oozie.client.event.jms.JMSHeaderConstants;
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.apache.oozie.event.*;
+import org.apache.oozie.jms.ConnectionContext;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJMSJobEventListener extends XTestCase {
+    private Services services;
+
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES,
+                JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName());
+        conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#" + ActiveMQConnFactory
+                + ";" + "java.naming.provider.url#" + localActiveMQBroker + ";connectionFactoryNames#"
+                + "ConnectionFactory");
+        services.init();
+    }
+
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    @Test
+    public void testOnWorkflowJobStartedEvent() throws ParseException {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.RUNNING, "user1",
+                "wf-app-name1", startDate, null);
+
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertFalse(message.getText().contains("endTime"));
+            WorkflowJobMessage wfStartMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
+            assertEquals(startDate, wfStartMessage.getStartTime());
+            assertEquals("wfId1", wfStartMessage.getId());
+            assertEquals("caId1", wfStartMessage.getParentId());
+            assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
+            assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
+            assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
+            assertEquals("user1", wfStartMessage.getUser());
+            assertEquals("wf-app-name1", wfStartMessage.getAppName());
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testOnWorkflowJobSuccessEvent() throws ParseException {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date endDate = new Date();
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUCCEEDED, "user1",
+                "wf-app-name1", startDate, endDate);
+
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            WorkflowJobMessage wfSuccMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
+            assertEquals(startDate, wfSuccMessage.getStartTime());
+            assertEquals(endDate, wfSuccMessage.getEndTime());
+            assertEquals("wfId1", wfSuccMessage.getId());
+            assertEquals("caId1", wfSuccMessage.getParentId());
+            assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
+            assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
+            assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
+            assertEquals("user1", wfSuccMessage.getUser());
+            assertEquals("wf-app-name1", wfSuccMessage.getAppName());
+            wfEventListener.destroy();
+
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testOnWorkflowJobFailureEvent() throws ParseException {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date endDate = new Date();
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+                "wf-app-name1", startDate, endDate);
+        wfe.setErrorCode("dummyErrorCode");
+        wfe.setErrorMessage("dummyErrorMessage");
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+            assertEquals(startDate, wfFailMessage.getStartTime());
+            assertEquals(endDate, wfFailMessage.getEndTime());
+            assertEquals("wfId1", wfFailMessage.getId());
+            assertEquals("caId1", wfFailMessage.getParentId());
+            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+            assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+            assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
+            assertEquals("user1", wfFailMessage.getUser());
+            assertEquals("wf-app-name1", wfFailMessage.getAppName());
+            assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
+            assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testOnWorkflowJobSuspendEvent() throws ParseException {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1",
+                "wf-app-name1", startDate, null);
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertFalse(message.getText().contains("endTime"));
+            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
+            assertEquals(startDate, wfFailMessage.getStartTime());
+            assertEquals("wfId1", wfFailMessage.getId());
+            assertEquals("caId1", wfFailMessage.getParentId());
+            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+            assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+            assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
+            assertEquals("user1", wfFailMessage.getUser());
+            assertEquals("wf-app-name1", wfFailMessage.getAppName());
+            assertNull(wfFailMessage.getErrorCode());
+            assertNull(wfFailMessage.getErrorMessage());
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testWorkflowJobSelectors() throws ParseException {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1",
+                "wf-app-name1", new Date(), new Date());
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            String selector = JMSHeaderConstants.USER + "='user_1'";
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+            assertEquals("user_1", wfFailMessage.getUser());
+            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testWorkflowJobSelectorsNegative() {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+                "wf-app-name1", new Date(), new Date());
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            // Pass a selector which wont match and assert for null message
+            String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertNull(message);
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testWorkflowJobSelectorsOr() {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+                "wf-app-name1", new Date(), new Date());
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            // Pass a selector using OR condition
+            String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
+                    + "='wf-app-name1'";
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+            assertEquals("user1", wfFailMessage.getUser());
+            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testWorkflowJobSelectorsAnd() {
+        JMSJobEventListener wfEventListener = new JMSJobEventListener();
+        wfEventListener.init();
+        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+                "wf-app-name1", new Date(), new Date());
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            // Pass a selector using AND condition
+            String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
+                    + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+            Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+            assertEquals("user1", wfFailMessage.getUser());
+            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testConnectionDrop() {
+        Random random = new Random();
+        try {
+            services.destroy();
+            services = new Services();
+            Configuration conf = services.getConf();
+            conf.set(Services.CONF_SERVICE_EXT_CLASSES, JMSAccessorService.class.getName() + ","
+                    + JMSTopicService.class.getName());
+            int randomPort = 30000 + random.nextInt(10000);
+            String brokerURl = "tcp://localhost:" + randomPort;
+            conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#"
+                    + ActiveMQConnFactory + ";" + "java.naming.provider.url#" + brokerURl + ";connectionFactoryNames#"
+                    + "ConnectionFactory");
+            services.init();
+            JMSJobEventListener wfEventListener = new JMSJobEventListener();
+            wfEventListener.init();
+            BrokerService broker = new BrokerService();
+            broker.addConnector(brokerURl);
+            broker.start();
+            ConnectionContext jmsContext = getConnectionContext();
+            assertNotNull(jmsContext);
+            broker.stop();
+            jmsContext = getConnectionContext();
+            // Exception Listener should have removed the conn context from
+            // connection map
+            assertNull(jmsContext);
+            broker = new BrokerService();
+            broker.addConnector(brokerURl);
+            broker.start();
+            WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+                    "wf-app-name1", new Date(), new Date());
+
+            jmsContext = getConnectionContext();
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+            wfEventListener.onWorkflowJobEvent(wfe);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertNotNull(message);
+            broker.stop();
+            wfEventListener.destroy();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testOnCoordinatorActionWaitingEvent() throws ParseException {
+        JMSJobEventListener wfEventListner = new JMSJobEventListener();
+        wfEventListner.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING,
+                "user1", "wf-app-name1", nominalTime, startDate, "missingDep1");
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
+            wfEventListner.onCoordinatorActionEvent(cae);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertFalse(message.getText().contains("endTime"));
+            assertFalse(message.getText().contains("errorCode"));
+            assertFalse(message.getText().contains("errorMessage"));
+            CoordinatorActionMessage coordActionWaitingMessage = (CoordinatorActionMessage) JMSMessagingUtils
+                    .getEventMessage(message);
+            assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
+            assertEquals(startDate, coordActionWaitingMessage.getStartTime());
+            assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
+            assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
+            assertEquals("caId1", coordActionWaitingMessage.getId());
+            assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
+            assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
+            assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
+            assertEquals("user1", coordActionWaitingMessage.getUser());
+            assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
+            assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testOnCoordinatorActionStartEvent() throws ParseException {
+        JMSJobEventListener coordEventListener = new JMSJobEventListener();
+        coordEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING,
+                "user1", "wf-app-name1", nominalTime, startDate, null);
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+            coordEventListener.onCoordinatorActionEvent(cae);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertFalse(message.getText().contains("endTime"));
+            assertFalse(message.getText().contains("errorCode"));
+            assertFalse(message.getText().contains("errorMessage"));
+            assertFalse(message.getText().contains("missingDependency"));
+            CoordinatorActionMessage coordActionStartMessage = (CoordinatorActionMessage) JMSMessagingUtils
+                    .getEventMessage(message);
+            assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
+            assertEquals(startDate, coordActionStartMessage.getStartTime());
+            assertEquals("caJobId1", coordActionStartMessage.getParentId());
+            assertEquals("caId1", coordActionStartMessage.getId());
+            assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
+            assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
+            assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
+            assertEquals("user1", coordActionStartMessage.getUser());
+            assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testOnCoordinatorJobSuccessEvent() throws ParseException {
+        JMSJobEventListener coordEventListener = new JMSJobEventListener();
+        coordEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+        Date endDate = new Date();
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1",
+                CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", nominalTime, startDate, null);
+        cae.setEndTime(endDate);
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+            coordEventListener.onCoordinatorActionEvent(cae);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertFalse(message.getText().contains("errorCode"));
+            assertFalse(message.getText().contains("errorMessage"));
+            assertFalse(message.getText().contains("missingDependency"));
+            CoordinatorActionMessage coordActionSuccessMessage = (CoordinatorActionMessage) JMSMessagingUtils
+                    .getEventMessage(message);
+            assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
+            assertEquals(startDate, coordActionSuccessMessage.getStartTime());
+            assertEquals(endDate, coordActionSuccessMessage.getEndTime());
+            assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
+            assertEquals("caId1", coordActionSuccessMessage.getId());
+            assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
+            assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
+            assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
+            assertEquals("user1", coordActionSuccessMessage.getUser());
+            assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testOnCoordinatorJobFailureEvent() throws ParseException {
+        JMSJobEventListener coordEventListener = new JMSJobEventListener();
+        coordEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+        Date endDate = new Date();
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
+                "user1", "wf-app-name1", nominalTime, startDate, null);
+        cae.setEndTime(endDate);
+        cae.setErrorCode("E0101");
+        cae.setErrorMessage("dummyError");
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+            coordEventListener.onCoordinatorActionEvent(cae);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertFalse(message.getText().contains("missingDependency"));
+            CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+                    .getEventMessage(message);
+            assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+            assertEquals(startDate, coordActionFailMessage.getStartTime());
+            assertEquals(endDate, coordActionFailMessage.getEndTime());
+            assertEquals("caJobId1", coordActionFailMessage.getParentId());
+            assertEquals("caId1", coordActionFailMessage.getId());
+            assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
+            assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
+            assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
+            assertEquals("user1", coordActionFailMessage.getUser());
+            assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
+            assertEquals("E0101", coordActionFailMessage.getErrorCode());
+            assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCoordinatorActionSelectors() throws ParseException {
+        JMSJobEventListener coordEventListener = new JMSJobEventListener();
+        coordEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
+                "user1", "wf-app-name1", nominalTime, startDate, null);
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            String selector = JMSHeaderConstants.USER + "='user1'";
+            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+            coordEventListener.onCoordinatorActionEvent(cae);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+                    .getEventMessage(message);
+            Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+            assertEquals("user1", coordActionFailMessage.getUser());
+            assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCoordinatorActionSelectorsNegative() throws ParseException {
+        JMSJobEventListener coordEventListener = new JMSJobEventListener();
+        coordEventListener.init();
+        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
+                "user1", "wf-app-name1", nominalTime, startDate, null);
+        ConnectionContext jmsContext = getConnectionContext();
+        try {
+            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+            // Pass a selector which wont match and assert for null message
+            String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+            coordEventListener.onCoordinatorActionEvent(cae);
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertNull(message);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    private ConnectionContext getConnectionContext() {
+        Configuration conf = services.getConf();
+        String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+        JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
+        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+        ConnectionContext jmsContext = jmsService.createConnectionContext(connInfo, false);
+        return jmsContext;
+
+    }
+
+}

Added: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,87 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.jms;
+
+import java.util.Properties;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJMSServerInfo extends XDataTestCase {
+
+    private Services services;
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.getConf().set(
+                JMSJobEventListener.JMS_CONNECTION_PROPERTIES,
+                "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#"
+                        + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory");
+        services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
+        services.init();
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    @Test
+    public void testJMSConnectionInfo() {
+        try {
+            JMSServerInfo jmsServerInfo = new DefaultJMSServerInfo();
+            String connectionProperties = Services.get().getConf()
+                    .get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, wfj.getId());
+            assertEquals(wfj.getUser(), jmsBean.getTopicName());
+            Properties props = jmsBean.getJNDIProperties();
+            assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial"));
+            assertEquals(localActiveMQBroker, props.get("java.naming.provider.url"));
+            assertEquals("ConnectionFactory", props.get("connectionFactoryNames"));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java Wed Apr 17 23:55:20 2013
@@ -33,13 +33,14 @@ import org.apache.oozie.test.XDataTestCa
 
 public class TestEventHandlerService extends XDataTestCase {
 
-    StringBuilder output = new StringBuilder();
+    static StringBuilder output = new StringBuilder();
 
     protected void setUp() throws Exception {
         super.setUp();
         Services services = new Services();
         Configuration conf = services.getConf();
         conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+        conf.setClass(EventHandlerService.CONF_LISTENERS, DummyJobEventListener.class, JobEventListener.class);
         services.init();
     }
 
@@ -59,8 +60,6 @@ public class TestEventHandlerService ext
 
     public void testEventListener() throws Exception {
         EventHandlerService ehs = _testEventHandlerService();
-        ehs.addEventListener(new DummyJobEventListener());
-
         /*
          * Workflow Job events
          */
@@ -68,25 +67,25 @@ public class TestEventHandlerService ext
                 "myapp", null, null);
         ehs.queueEvent(event);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Workflow Job STARTED"));
+        assertTrue(output.toString().contains("Workflow Job event STARTED"));
         output.setLength(0);
 
         event.setStatus(WorkflowJob.Status.SUSPENDED);
         ehs.queueEvent(event);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Workflow Job SUSPEND"));
+        assertTrue(output.toString().contains("Workflow Job event SUSPEND"));
         output.setLength(0);
 
         event.setStatus(WorkflowJob.Status.SUCCEEDED);
         ehs.queueEvent(event);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Workflow Job SUCCESS"));
+        assertTrue(output.toString().contains("Workflow Job event SUCCESS"));
         output.setLength(0);
 
         event.setStatus(WorkflowJob.Status.KILLED);
         ehs.queueEvent(event);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Workflow Job FAILURE"));
+        assertTrue(output.toString().contains("Workflow Job event FAILURE"));
         output.setLength(0);
 
         /*
@@ -96,37 +95,37 @@ public class TestEventHandlerService ext
                 CoordinatorAction.Status.WAITING, getTestUser(), "myapp", null, null, null);
         ehs.queueEvent(event2);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Coord Action WAITING"));
+        assertTrue(output.toString().contains("Coord Action event WAITING"));
         output.setLength(0);
 
         event2.setStatus(CoordinatorAction.Status.RUNNING);
         ehs.queueEvent(event2);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Coord Action START"));
+        assertTrue(output.toString().contains("Coord Action event STARTED"));
         output.setLength(0);
 
         event2.setStatus(CoordinatorAction.Status.SUSPENDED);
         ehs.queueEvent(event2);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Coord Action SUSPEND"));
+        assertTrue(output.toString().contains("Coord Action event SUSPEND"));
         output.setLength(0);
 
         event2.setStatus(CoordinatorAction.Status.SUCCEEDED);
         ehs.queueEvent(event2);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Coord Action SUCCESS"));
+        assertTrue(output.toString().contains("Coord Action event SUCCESS"));
         output.setLength(0);
 
         event2.setStatus(CoordinatorAction.Status.TIMEDOUT);
         ehs.queueEvent(event2);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Coord Action FAILURE"));
+        assertTrue(output.toString().contains("Coord Action event FAILURE"));
         output.setLength(0);
 
         event2.setStatus(CoordinatorAction.Status.KILLED);
         ehs.queueEvent(event2);
         ehs.new EventWorker().run();
-        assertTrue(output.toString().contains("Coord Action FAILURE"));
+        assertTrue(output.toString().contains("Coord Action event FAILURE"));
         output.setLength(0);
     }
 
@@ -137,152 +136,40 @@ public class TestEventHandlerService ext
         return ehs;
     }
 
-    class DummyJobEventListener extends JobEventListener {
-
-        @Override
-        public void onWorkflowJobStart(WorkflowJobEvent wje) {
-            if (wje != null) {
-                output.append("Dummy Workflow Job STARTED");
-            }
-        }
-
-        @Override
-        public void onWorkflowJobSuccess(WorkflowJobEvent wje) {
-            if (wje != null) {
-                output.append("Dummy Workflow Job SUCCESS");
-            }
-        }
+    static class DummyJobEventListener extends JobEventListener {
 
         @Override
-        public void onWorkflowJobFailure(WorkflowJobEvent wje) {
+        public void onWorkflowJobEvent(WorkflowJobEvent wje) {
             if (wje != null) {
-                output.append("Dummy Workflow Job FAILURE");
-            }
-        }
-
-        @Override
-        public void onWorkflowJobSuspend(WorkflowJobEvent wje) {
-            if (wje != null) {
-                output.append("Dummy Workflow Job SUSPEND");
-            }
-        }
-
-        @Override
-        public void onWorkflowActionStart(WorkflowActionEvent wae) {
-            if (wae != null) {
-                output.append("Dummy Workflow Action START");
+                output.append("Dummy Workflow Job event " + wje.getEventStatus());
             }
         }
 
         @Override
-        public void onWorkflowActionSuccess(WorkflowActionEvent wae) {
+        public void onWorkflowActionEvent(WorkflowActionEvent wae) {
             if (wae != null) {
-                output.append("Dummy Workflow Action SUCCESS");
-            }
-        }
-
-        @Override
-        public void onWorkflowActionFailure(WorkflowActionEvent wae) {
-            if (wae != null) {
-                output.append("Dummy Workflow Action FAILURE");
-            }
-        }
-
-        @Override
-        public void onWorkflowActionSuspend(WorkflowActionEvent wae) {
-            if (wae != null) {
-                output.append("Dummy Workflow Action SUSPEND");
-            }
-        }
-
-        @Override
-        public void onCoordinatorJobStart(CoordinatorJobEvent cje) {
-            if (cje != null) {
-                output.append("Dummy Coord Job START");
-            }
-        }
-
-        @Override
-        public void onCoordinatorJobSuccess(CoordinatorJobEvent cje) {
-            if (cje != null) {
-                output.append("Dummy Coord Job SUCCESS");
+                output.append("Dummy Workflow Action event "+ wae.getEventStatus());
             }
         }
 
         @Override
-        public void onCoordinatorJobFailure(CoordinatorJobEvent cje) {
+        public void onCoordinatorJobEvent(CoordinatorJobEvent cje) {
             if (cje != null) {
-                output.append("Dummy Coord Job FAILURE");
+                output.append("Dummy Coord Job event "+cje.getEventStatus());
             }
         }
 
         @Override
-        public void onCoordinatorJobSuspend(CoordinatorJobEvent cje) {
-            if (cje != null) {
-                output.append("Dummy Coord Job SUSPEND");
-            }
-        }
-
-        @Override
-        public void onCoordinatorActionWaiting(CoordinatorActionEvent cae) {
-            if (cae != null) {
-                output.append("Dummy Coord Action WAITING");
-            }
-        }
-
-        @Override
-        public void onCoordinatorActionStart(CoordinatorActionEvent cae) {
-            if (cae != null) {
-                output.append("Dummy Coord Action START");
-            }
-        }
-
-        @Override
-        public void onCoordinatorActionSuccess(CoordinatorActionEvent cae) {
-            if (cae != null) {
-                output.append("Dummy Coord Action SUCCESS");
-            }
-        }
-
-        @Override
-        public void onCoordinatorActionFailure(CoordinatorActionEvent cae) {
+        public void onCoordinatorActionEvent(CoordinatorActionEvent cae) {
             if (cae != null) {
-                output.append("Dummy Coord Action FAILURE");
-            }
-        }
-
-        @Override
-        public void onCoordinatorActionSuspend(CoordinatorActionEvent cae) {
-            if (cae != null) {
-                output.append("Dummy Coord Action SUSPEND");
-            }
-        }
-
-        @Override
-        public void onBundleJobStart(BundleJobEvent bje) {
-            if (bje != null) {
-                output.append("Dummy Bundle Job START");
-            }
-        }
-
-        @Override
-        public void onBundleJobSuccess(BundleJobEvent bje) {
-            if (bje != null) {
-                output.append("Dummy Bundle Job SUCCESS");
-            }
-        }
-
-        @Override
-        public void onBundleJobFailure(BundleJobEvent bje) {
-            if (bje != null) {
-                output.append("Dummy Bundle Job FAILURE");
+                output.append("Dummy Coord Action event "+cae.getEventStatus());
             }
         }
 
         @Override
-        public void onBundleJobSuspend(BundleJobEvent bje) {
+        public void onBundleJobEvent(BundleJobEvent bje) {
             if (bje != null) {
-                output.append("Dummy Bundle Job SUSPEND");
+                output.append("Dummy Bundle Job event "+bje.getEventStatus());
             }
         }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Wed Apr 17 23:55:20 2013
@@ -56,10 +56,10 @@ public class TestJMSAccessorService exte
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
         // both servers should connect to default JMS server
         JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
-        ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo);
+        ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo, true);
         assertTrue(ctxt1.isConnectionInitialized());
         JMSConnectionInfo connInfo1 = hcatService.getJMSConnectionInfo(new URI("http://unknown:80"));
-        ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1);
+        ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1, true);
         assertTrue(ctxt2.isConnectionInitialized());
         assertEquals(ctxt1, ctxt2);
         ctxt1.close();
@@ -213,7 +213,7 @@ public class TestJMSAccessorService exte
         assertTrue(jmsService.isListeningToTopic(connInfo, topic));
         assertFalse(jmsService.isConnectionInRetryList(connInfo));
         assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
+        ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo, true);
         broker.stop();
         try {
             connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);

Added: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJMSTopicService extends XDataTestCase {
+
+    private Services services;
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = setupServicesForTopic();
+        services.init();
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    private Services setupServicesForTopic() throws ServiceException {
+        Services services = new Services();
+        services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
+        return services;
+    }
+
+    @Test
+    public void testTopicAsUser() {
+        try {
+            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            assertEquals(wfj.getUser(), jmsTopicService.getTopic(wfj.getId()));
+            WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+            assertEquals(wfj.getUser(), jmsTopicService.getTopic(wab.getId()));
+            CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+            assertEquals(wfj.getUser(), jmsTopicService.getTopic(cjb.getId()));
+            CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-for-action-input-check.xml", 0);
+            assertEquals(wfj.getUser(), jmsTopicService.getTopic(cab.getId()));
+            BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+            assertEquals(wfj.getUser(), jmsTopicService.getTopic(bjb.getId()));
+            BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+            assertEquals(wfj.getUser(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testTopicAsJobId() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.getConf().set(JMSTopicService.TOPIC_NAME, "default=" + JMSTopicService.TopicType.JOBID.getValue());
+            services.init();
+            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            assertEquals(wfj.getId(), jmsTopicService.getTopic(wfj.getId()));
+            WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+            assertEquals(wfj.getId(), jmsTopicService.getTopic(wab.getId()));
+            CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+            assertEquals(cjb.getId(), jmsTopicService.getTopic(cjb.getId()));
+            CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-for-action-input-check.xml", 0);
+            assertEquals(cjb.getId(), jmsTopicService.getTopic(cab.getId()));
+            BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+            assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+            BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+            assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testTopicAsFixedString() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.getConf().set(
+                    JMSTopicService.TOPIC_NAME,
+                    JMSTopicService.JobType.WORKFLOW.getValue() + " =workflow,"
+                            + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord,"
+                            + JMSTopicService.JobType.BUNDLE.getValue() + "=bundle");
+            services.init();
+            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+            WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+            assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+            CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+            assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+            CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-for-action-input-check.xml", 0);
+            assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+            BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+            assertEquals("bundle", jmsTopicService.getTopic(bjb.getId()));
+            BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+            assertEquals("bundle", jmsTopicService.getTopic(bab.getBundleActionId()));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testMixedTopic1() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.getConf().set(
+                    JMSTopicService.TOPIC_NAME,
+                    JMSTopicService.JobType.WORKFLOW.getValue() + " = workflow,"
+                            + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord, default = "
+                            + JMSTopicService.TopicType.JOBID.getValue());
+            services.init();
+            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+            WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+            assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+            CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+            assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+            CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-for-action-input-check.xml", 0);
+            assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+            BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+            assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+            BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+            assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testMixedTopic2() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.getConf().set(
+                    JMSTopicService.TOPIC_NAME,
+                    JMSTopicService.JobType.WORKFLOW.getValue() + " = workflow,"
+                            + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord");
+            services.init();
+            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+            WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+            assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+            WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+            assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+            CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+            assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+            CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-for-action-input-check.xml", 0);
+            assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+            BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+            // As no default is specified, user will be considered as topic
+            assertEquals(bjb.getUser(), jmsTopicService.getTopic(bjb.getId()));
+            BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+            assertEquals(bjb.getUser(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testIncorrectConfigurationJobType() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.getConf().set(JMSTopicService.TOPIC_NAME,
+                    "InvalidJobType" + " = workflow," + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord");
+            services.init();
+            fail("Expected Service Exception");
+        }
+        catch (ServiceException se) {
+            assertTrue(se.getMessage().contains("Incorrect job type"));
+        }
+    }
+
+    @Test
+    public void testIncorrectConfigurationDefault() {
+        try {
+            services.destroy();
+            services = setupServicesForTopic();
+            services.getConf().set(JMSTopicService.TOPIC_NAME, "default=" + "invalidvalue");
+            services.init();
+            fail("Expected Service Exception");
+        }
+        catch (ServiceException se) {
+            assertTrue(se.getMessage().contains("not allowed in default"));
+        }
+    }
+
+}

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,11 +30,13 @@ import org.apache.oozie.DagEngineExcepti
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowsInfo;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
 import org.apache.oozie.client.rest.JsonWorkflowAction;
 import org.apache.oozie.client.rest.JsonWorkflowJob;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.DagEngineService;
 import org.apache.oozie.util.XmlUtils;
+import org.json.simple.JSONValue;
 
 public class MockDagEngineService extends DagEngineService {
     public static final String JOB_ID = "job-";
@@ -176,6 +178,12 @@ public class MockDagEngineService extend
         }
 
         @Override
+        public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
+            did = RestConstants.JOB_SHOW_JMS_INFO;
+            return createDummyJMSConnectionInfo();
+        }
+
+        @Override
         public String getDefinition(String jobId) throws DagEngineException {
             did = RestConstants.JOB_SHOW_DEFINITION;
             int idx = validateWorkflowIdx(jobId);
@@ -227,6 +235,16 @@ public class MockDagEngineService extend
         }
     }
 
+    private static JMSConnectionInfoBean createDummyJMSConnectionInfo() {
+        JMSConnectionInfoBean jmsBean = new JMSConnectionInfoBean();
+        Properties jmsProps = new Properties();
+        jmsProps.setProperty("k1", "v1");
+        jmsProps.setProperty("k2", "v2");
+        jmsBean.setJNDIProperties(jmsProps);
+        jmsBean.setTopicName("topic");
+        return jmsBean;
+    }
+
     private static WorkflowJob createDummyWorkflow(int idx) {
         JsonWorkflowJob workflow = new JsonWorkflowJob();
         workflow.setId(JOB_ID + idx + JOB_ID_END);

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java Wed Apr 17 23:55:20 2013
@@ -337,4 +337,22 @@ public class TestV1JobServlet extends Da
             }
         });
     }
+
+    public void testJMSInfo() throws Exception {
+        runTest("/v1/job/*", V1JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                MockDagEngineService.reset();
+                Map<String, String> params = new HashMap<String, String>();
+                params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_JMS_INFO);
+                URL url = createURL(MockDagEngineService.JOB_ID + 1 + MockDagEngineService.JOB_ID_END, params);
+                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+                conn.setRequestMethod("GET");
+                assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+                assertEquals(RestConstants.JOB_SHOW_JMS_INFO, MockDagEngineService.did);
+                return null;
+            }
+        });
+    }
+
 }

Modified: oozie/trunk/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/pom.xml?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/pom.xml (original)
+++ oozie/trunk/pom.xml Wed Apr 17 23:55:20 2013
@@ -272,6 +272,18 @@
               <version>1.6.6</version>
           </dependency>
 
+            <dependency>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-mapper-asl</artifactId>
+                <version>1.8.8</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-core-asl</artifactId>
+                <version>1.8.8</version>
+            </dependency>
+
             <!-- core -->
             <dependency>
                 <groupId>org.apache.oozie</groupId>

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Apr 17 23:55:20 2013
@@ -1,5 +1,7 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1235 Client API for retrieving topic and jms connection related details (virag)
+OOZIE-1234 JMS Event Listeners for publishing notifications related to workflow and coordinator (virag)
 OOZIE-1281  Hiveaction should populate externalChildIDs (rohini via virag)
 OOZIE-1322 show child job URL tab selectively for pig action (ryota via mona)
 OOZIE-1307 Cover package org.apache.oozie.action.ssh with unit tests (vbondarev via rkanter)