You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/04/18 01:55:21 UTC
svn commit: r1469108 [3/3] - in /oozie/trunk: ./ client/
client/src/main/java/org/apache/oozie/cli/
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/event/jms/
client/src/main/java/org/apache/oozie/client/event...
Added: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.text.ParseException;
+import java.util.Date;
+import java.util.Random;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.jms.JMSMessagingUtils;
+import org.apache.oozie.client.event.jms.JMSHeaderConstants;
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.apache.oozie.event.*;
+import org.apache.oozie.jms.ConnectionContext;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJMSJobEventListener extends XTestCase {
+ private Services services;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES,
+ JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName());
+ conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#" + ActiveMQConnFactory
+ + ";" + "java.naming.provider.url#" + localActiveMQBroker + ";connectionFactoryNames#"
+ + "ConnectionFactory");
+ services.init();
+ }
+
+ @After
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testOnWorkflowJobStartedEvent() throws ParseException {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.RUNNING, "user1",
+ "wf-app-name1", startDate, null);
+
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ WorkflowJobMessage wfStartMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
+ assertEquals(startDate, wfStartMessage.getStartTime());
+ assertEquals("wfId1", wfStartMessage.getId());
+ assertEquals("caId1", wfStartMessage.getParentId());
+ assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
+ assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
+ assertEquals("user1", wfStartMessage.getUser());
+ assertEquals("wf-app-name1", wfStartMessage.getAppName());
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOnWorkflowJobSuccessEvent() throws ParseException {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date endDate = new Date();
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUCCEEDED, "user1",
+ "wf-app-name1", startDate, endDate);
+
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfSuccMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
+ assertEquals(startDate, wfSuccMessage.getStartTime());
+ assertEquals(endDate, wfSuccMessage.getEndTime());
+ assertEquals("wfId1", wfSuccMessage.getId());
+ assertEquals("caId1", wfSuccMessage.getParentId());
+ assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
+ assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
+ assertEquals("user1", wfSuccMessage.getUser());
+ assertEquals("wf-app-name1", wfSuccMessage.getAppName());
+ wfEventListener.destroy();
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOnWorkflowJobFailureEvent() throws ParseException {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date endDate = new Date();
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+ "wf-app-name1", startDate, endDate);
+ wfe.setErrorCode("dummyErrorCode");
+ wfe.setErrorMessage("dummyErrorMessage");
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals(startDate, wfFailMessage.getStartTime());
+ assertEquals(endDate, wfFailMessage.getEndTime());
+ assertEquals("wfId1", wfFailMessage.getId());
+ assertEquals("caId1", wfFailMessage.getParentId());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+ assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals("wf-app-name1", wfFailMessage.getAppName());
+ assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
+ assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOnWorkflowJobSuspendEvent() throws ParseException {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1",
+ "wf-app-name1", startDate, null);
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
+ assertEquals(startDate, wfFailMessage.getStartTime());
+ assertEquals("wfId1", wfFailMessage.getId());
+ assertEquals("caId1", wfFailMessage.getParentId());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+ assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals("wf-app-name1", wfFailMessage.getAppName());
+ assertNull(wfFailMessage.getErrorCode());
+ assertNull(wfFailMessage.getErrorMessage());
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWorkflowJobSelectors() throws ParseException {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1",
+ "wf-app-name1", new Date(), new Date());
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ String selector = JMSHeaderConstants.USER + "='user_1'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals("user_1", wfFailMessage.getUser());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWorkflowJobSelectorsNegative() {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+ "wf-app-name1", new Date(), new Date());
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector which wont match and assert for null message
+ String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNull(message);
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWorkflowJobSelectorsOr() {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+ "wf-app-name1", new Date(), new Date());
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector using OR condition
+ String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
+ + "='wf-app-name1'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWorkflowJobSelectorsAnd() {
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+ "wf-app-name1", new Date(), new Date());
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector using AND condition
+ String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
+ + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConnectionDrop() {
+ Random random = new Random();
+ try {
+ services.destroy();
+ services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES, JMSAccessorService.class.getName() + ","
+ + JMSTopicService.class.getName());
+ int randomPort = 30000 + random.nextInt(10000);
+ String brokerURl = "tcp://localhost:" + randomPort;
+ conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#"
+ + ActiveMQConnFactory + ";" + "java.naming.provider.url#" + brokerURl + ";connectionFactoryNames#"
+ + "ConnectionFactory");
+ services.init();
+ JMSJobEventListener wfEventListener = new JMSJobEventListener();
+ wfEventListener.init();
+ BrokerService broker = new BrokerService();
+ broker.addConnector(brokerURl);
+ broker.start();
+ ConnectionContext jmsContext = getConnectionContext();
+ assertNotNull(jmsContext);
+ broker.stop();
+ jmsContext = getConnectionContext();
+ // Exception Listener should have removed the conn context from
+ // connection map
+ assertNull(jmsContext);
+ broker = new BrokerService();
+ broker.addConnector(brokerURl);
+ broker.start();
+ WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
+ "wf-app-name1", new Date(), new Date());
+
+ jmsContext = getConnectionContext();
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNotNull(message);
+ broker.stop();
+ wfEventListener.destroy();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testOnCoordinatorActionWaitingEvent() throws ParseException {
+ JMSJobEventListener wfEventListner = new JMSJobEventListener();
+ wfEventListner.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING,
+ "user1", "wf-app-name1", nominalTime, startDate, "missingDep1");
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
+ wfEventListner.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ assertFalse(message.getText().contains("errorCode"));
+ assertFalse(message.getText().contains("errorMessage"));
+ CoordinatorActionMessage coordActionWaitingMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
+ assertEquals(startDate, coordActionWaitingMessage.getStartTime());
+ assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
+ assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
+ assertEquals("caId1", coordActionWaitingMessage.getId());
+ assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
+ assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
+ assertEquals("user1", coordActionWaitingMessage.getUser());
+ assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
+ assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOnCoordinatorActionStartEvent() throws ParseException {
+ JMSJobEventListener coordEventListener = new JMSJobEventListener();
+ coordEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING,
+ "user1", "wf-app-name1", nominalTime, startDate, null);
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ assertFalse(message.getText().contains("errorCode"));
+ assertFalse(message.getText().contains("errorMessage"));
+ assertFalse(message.getText().contains("missingDependency"));
+ CoordinatorActionMessage coordActionStartMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
+ assertEquals(startDate, coordActionStartMessage.getStartTime());
+ assertEquals("caJobId1", coordActionStartMessage.getParentId());
+ assertEquals("caId1", coordActionStartMessage.getId());
+ assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
+ assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
+ assertEquals("user1", coordActionStartMessage.getUser());
+ assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOnCoordinatorJobSuccessEvent() throws ParseException {
+ JMSJobEventListener coordEventListener = new JMSJobEventListener();
+ coordEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+ Date endDate = new Date();
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1",
+ CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", nominalTime, startDate, null);
+ cae.setEndTime(endDate);
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("errorCode"));
+ assertFalse(message.getText().contains("errorMessage"));
+ assertFalse(message.getText().contains("missingDependency"));
+ CoordinatorActionMessage coordActionSuccessMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
+ assertEquals(startDate, coordActionSuccessMessage.getStartTime());
+ assertEquals(endDate, coordActionSuccessMessage.getEndTime());
+ assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
+ assertEquals("caId1", coordActionSuccessMessage.getId());
+ assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
+ assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
+ assertEquals("user1", coordActionSuccessMessage.getUser());
+ assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOnCoordinatorJobFailureEvent() throws ParseException {
+ JMSJobEventListener coordEventListener = new JMSJobEventListener();
+ coordEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+ Date endDate = new Date();
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
+ "user1", "wf-app-name1", nominalTime, startDate, null);
+ cae.setEndTime(endDate);
+ cae.setErrorCode("E0101");
+ cae.setErrorMessage("dummyError");
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("missingDependency"));
+ CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+ assertEquals(startDate, coordActionFailMessage.getStartTime());
+ assertEquals(endDate, coordActionFailMessage.getEndTime());
+ assertEquals("caJobId1", coordActionFailMessage.getParentId());
+ assertEquals("caId1", coordActionFailMessage.getId());
+ assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
+ assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
+ assertEquals("user1", coordActionFailMessage.getUser());
+ assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
+ assertEquals("E0101", coordActionFailMessage.getErrorCode());
+ assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCoordinatorActionSelectors() throws ParseException {
+ JMSJobEventListener coordEventListener = new JMSJobEventListener();
+ coordEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
+ "user1", "wf-app-name1", nominalTime, startDate, null);
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ String selector = JMSHeaderConstants.USER + "='user1'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ .getEventMessage(message);
+ Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+ assertEquals("user1", coordActionFailMessage.getUser());
+ assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCoordinatorActionSelectorsNegative() throws ParseException {
+ JMSJobEventListener coordEventListener = new JMSJobEventListener();
+ coordEventListener.init();
+ Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
+ Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
+ CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
+ "user1", "wf-app-name1", nominalTime, startDate, null);
+ ConnectionContext jmsContext = getConnectionContext();
+ try {
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector which wont match and assert for null message
+ String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNull(message);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private ConnectionContext getConnectionContext() {
+ Configuration conf = services.getConf();
+ String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+ JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
+ JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+ ConnectionContext jmsContext = jmsService.createConnectionContext(connInfo, false);
+ return jmsContext;
+
+ }
+
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,87 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.jms;
+
+import java.util.Properties;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJMSServerInfo extends XDataTestCase {
+
+ private Services services;
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.getConf().set(
+ JMSJobEventListener.JMS_CONNECTION_PROPERTIES,
+ "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#"
+ + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory");
+ services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
+ services.init();
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testJMSConnectionInfo() {
+ try {
+ JMSServerInfo jmsServerInfo = new DefaultJMSServerInfo();
+ String connectionProperties = Services.get().getConf()
+ .get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+ WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, wfj.getId());
+ assertEquals(wfj.getUser(), jmsBean.getTopicName());
+ Properties props = jmsBean.getJNDIProperties();
+ assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial"));
+ assertEquals(localActiveMQBroker, props.get("java.naming.provider.url"));
+ assertEquals("ConnectionFactory", props.get("connectionFactoryNames"));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java Wed Apr 17 23:55:20 2013
@@ -33,13 +33,14 @@ import org.apache.oozie.test.XDataTestCa
public class TestEventHandlerService extends XDataTestCase {
- StringBuilder output = new StringBuilder();
+ static StringBuilder output = new StringBuilder();
protected void setUp() throws Exception {
super.setUp();
Services services = new Services();
Configuration conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+ conf.setClass(EventHandlerService.CONF_LISTENERS, DummyJobEventListener.class, JobEventListener.class);
services.init();
}
@@ -59,8 +60,6 @@ public class TestEventHandlerService ext
public void testEventListener() throws Exception {
EventHandlerService ehs = _testEventHandlerService();
- ehs.addEventListener(new DummyJobEventListener());
-
/*
* Workflow Job events
*/
@@ -68,25 +67,25 @@ public class TestEventHandlerService ext
"myapp", null, null);
ehs.queueEvent(event);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Workflow Job STARTED"));
+ assertTrue(output.toString().contains("Workflow Job event STARTED"));
output.setLength(0);
event.setStatus(WorkflowJob.Status.SUSPENDED);
ehs.queueEvent(event);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Workflow Job SUSPEND"));
+ assertTrue(output.toString().contains("Workflow Job event SUSPEND"));
output.setLength(0);
event.setStatus(WorkflowJob.Status.SUCCEEDED);
ehs.queueEvent(event);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Workflow Job SUCCESS"));
+ assertTrue(output.toString().contains("Workflow Job event SUCCESS"));
output.setLength(0);
event.setStatus(WorkflowJob.Status.KILLED);
ehs.queueEvent(event);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Workflow Job FAILURE"));
+ assertTrue(output.toString().contains("Workflow Job event FAILURE"));
output.setLength(0);
/*
@@ -96,37 +95,37 @@ public class TestEventHandlerService ext
CoordinatorAction.Status.WAITING, getTestUser(), "myapp", null, null, null);
ehs.queueEvent(event2);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Coord Action WAITING"));
+ assertTrue(output.toString().contains("Coord Action event WAITING"));
output.setLength(0);
event2.setStatus(CoordinatorAction.Status.RUNNING);
ehs.queueEvent(event2);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Coord Action START"));
+ assertTrue(output.toString().contains("Coord Action event STARTED"));
output.setLength(0);
event2.setStatus(CoordinatorAction.Status.SUSPENDED);
ehs.queueEvent(event2);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Coord Action SUSPEND"));
+ assertTrue(output.toString().contains("Coord Action event SUSPEND"));
output.setLength(0);
event2.setStatus(CoordinatorAction.Status.SUCCEEDED);
ehs.queueEvent(event2);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Coord Action SUCCESS"));
+ assertTrue(output.toString().contains("Coord Action event SUCCESS"));
output.setLength(0);
event2.setStatus(CoordinatorAction.Status.TIMEDOUT);
ehs.queueEvent(event2);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Coord Action FAILURE"));
+ assertTrue(output.toString().contains("Coord Action event FAILURE"));
output.setLength(0);
event2.setStatus(CoordinatorAction.Status.KILLED);
ehs.queueEvent(event2);
ehs.new EventWorker().run();
- assertTrue(output.toString().contains("Coord Action FAILURE"));
+ assertTrue(output.toString().contains("Coord Action event FAILURE"));
output.setLength(0);
}
@@ -137,152 +136,40 @@ public class TestEventHandlerService ext
return ehs;
}
- class DummyJobEventListener extends JobEventListener {
-
- @Override
- public void onWorkflowJobStart(WorkflowJobEvent wje) {
- if (wje != null) {
- output.append("Dummy Workflow Job STARTED");
- }
- }
-
- @Override
- public void onWorkflowJobSuccess(WorkflowJobEvent wje) {
- if (wje != null) {
- output.append("Dummy Workflow Job SUCCESS");
- }
- }
+ static class DummyJobEventListener extends JobEventListener {
@Override
- public void onWorkflowJobFailure(WorkflowJobEvent wje) {
+ public void onWorkflowJobEvent(WorkflowJobEvent wje) {
if (wje != null) {
- output.append("Dummy Workflow Job FAILURE");
- }
- }
-
- @Override
- public void onWorkflowJobSuspend(WorkflowJobEvent wje) {
- if (wje != null) {
- output.append("Dummy Workflow Job SUSPEND");
- }
- }
-
- @Override
- public void onWorkflowActionStart(WorkflowActionEvent wae) {
- if (wae != null) {
- output.append("Dummy Workflow Action START");
+ output.append("Dummy Workflow Job event " + wje.getEventStatus());
}
}
@Override
- public void onWorkflowActionSuccess(WorkflowActionEvent wae) {
+ public void onWorkflowActionEvent(WorkflowActionEvent wae) {
if (wae != null) {
- output.append("Dummy Workflow Action SUCCESS");
- }
- }
-
- @Override
- public void onWorkflowActionFailure(WorkflowActionEvent wae) {
- if (wae != null) {
- output.append("Dummy Workflow Action FAILURE");
- }
- }
-
- @Override
- public void onWorkflowActionSuspend(WorkflowActionEvent wae) {
- if (wae != null) {
- output.append("Dummy Workflow Action SUSPEND");
- }
- }
-
- @Override
- public void onCoordinatorJobStart(CoordinatorJobEvent cje) {
- if (cje != null) {
- output.append("Dummy Coord Job START");
- }
- }
-
- @Override
- public void onCoordinatorJobSuccess(CoordinatorJobEvent cje) {
- if (cje != null) {
- output.append("Dummy Coord Job SUCCESS");
+ output.append("Dummy Workflow Action event "+ wae.getEventStatus());
}
}
@Override
- public void onCoordinatorJobFailure(CoordinatorJobEvent cje) {
+ public void onCoordinatorJobEvent(CoordinatorJobEvent cje) {
if (cje != null) {
- output.append("Dummy Coord Job FAILURE");
+ output.append("Dummy Coord Job event "+cje.getEventStatus());
}
}
@Override
- public void onCoordinatorJobSuspend(CoordinatorJobEvent cje) {
- if (cje != null) {
- output.append("Dummy Coord Job SUSPEND");
- }
- }
-
- @Override
- public void onCoordinatorActionWaiting(CoordinatorActionEvent cae) {
- if (cae != null) {
- output.append("Dummy Coord Action WAITING");
- }
- }
-
- @Override
- public void onCoordinatorActionStart(CoordinatorActionEvent cae) {
- if (cae != null) {
- output.append("Dummy Coord Action START");
- }
- }
-
- @Override
- public void onCoordinatorActionSuccess(CoordinatorActionEvent cae) {
- if (cae != null) {
- output.append("Dummy Coord Action SUCCESS");
- }
- }
-
- @Override
- public void onCoordinatorActionFailure(CoordinatorActionEvent cae) {
+ public void onCoordinatorActionEvent(CoordinatorActionEvent cae) {
if (cae != null) {
- output.append("Dummy Coord Action FAILURE");
- }
- }
-
- @Override
- public void onCoordinatorActionSuspend(CoordinatorActionEvent cae) {
- if (cae != null) {
- output.append("Dummy Coord Action SUSPEND");
- }
- }
-
- @Override
- public void onBundleJobStart(BundleJobEvent bje) {
- if (bje != null) {
- output.append("Dummy Bundle Job START");
- }
- }
-
- @Override
- public void onBundleJobSuccess(BundleJobEvent bje) {
- if (bje != null) {
- output.append("Dummy Bundle Job SUCCESS");
- }
- }
-
- @Override
- public void onBundleJobFailure(BundleJobEvent bje) {
- if (bje != null) {
- output.append("Dummy Bundle Job FAILURE");
+ output.append("Dummy Coord Action event "+cae.getEventStatus());
}
}
@Override
- public void onBundleJobSuspend(BundleJobEvent bje) {
+ public void onBundleJobEvent(BundleJobEvent bje) {
if (bje != null) {
- output.append("Dummy Bundle Job SUSPEND");
+ output.append("Dummy Bundle Job event "+bje.getEventStatus());
}
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Wed Apr 17 23:55:20 2013
@@ -56,10 +56,10 @@ public class TestJMSAccessorService exte
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
// both servers should connect to default JMS server
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
- ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo);
+ ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo, true);
assertTrue(ctxt1.isConnectionInitialized());
JMSConnectionInfo connInfo1 = hcatService.getJMSConnectionInfo(new URI("http://unknown:80"));
- ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1);
+ ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1, true);
assertTrue(ctxt2.isConnectionInitialized());
assertEquals(ctxt1, ctxt2);
ctxt1.close();
@@ -213,7 +213,7 @@ public class TestJMSAccessorService exte
assertTrue(jmsService.isListeningToTopic(connInfo, topic));
assertFalse(jmsService.isConnectionInRetryList(connInfo));
assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
+ ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo, true);
broker.stop();
try {
connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
Added: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJMSTopicService extends XDataTestCase {
+
+ private Services services;
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = setupServicesForTopic();
+ services.init();
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ private Services setupServicesForTopic() throws ServiceException {
+ Services services = new Services();
+ services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
+ return services;
+ }
+
+ @Test
+ public void testTopicAsUser() {
+ try {
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ assertEquals(wfj.getUser(), jmsTopicService.getTopic(wfj.getId()));
+ WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+ assertEquals(wfj.getUser(), jmsTopicService.getTopic(wab.getId()));
+ CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+ assertEquals(wfj.getUser(), jmsTopicService.getTopic(cjb.getId()));
+ CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-for-action-input-check.xml", 0);
+ assertEquals(wfj.getUser(), jmsTopicService.getTopic(cab.getId()));
+ BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+ assertEquals(wfj.getUser(), jmsTopicService.getTopic(bjb.getId()));
+ BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+ assertEquals(wfj.getUser(), jmsTopicService.getTopic(bab.getBundleActionId()));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testTopicAsJobId() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.getConf().set(JMSTopicService.TOPIC_NAME, "default=" + JMSTopicService.TopicType.JOBID.getValue());
+ services.init();
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ assertEquals(wfj.getId(), jmsTopicService.getTopic(wfj.getId()));
+ WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+ assertEquals(wfj.getId(), jmsTopicService.getTopic(wab.getId()));
+ CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+ assertEquals(cjb.getId(), jmsTopicService.getTopic(cjb.getId()));
+ CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-for-action-input-check.xml", 0);
+ assertEquals(cjb.getId(), jmsTopicService.getTopic(cab.getId()));
+ BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+ assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+ BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+ assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testTopicAsFixedString() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.getConf().set(
+ JMSTopicService.TOPIC_NAME,
+ JMSTopicService.JobType.WORKFLOW.getValue() + " =workflow,"
+ + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord,"
+ + JMSTopicService.JobType.BUNDLE.getValue() + "=bundle");
+ services.init();
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+ WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+ assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+ CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+ assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+ CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-for-action-input-check.xml", 0);
+ assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+ BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+ assertEquals("bundle", jmsTopicService.getTopic(bjb.getId()));
+ BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+ assertEquals("bundle", jmsTopicService.getTopic(bab.getBundleActionId()));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testMixedTopic1() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.getConf().set(
+ JMSTopicService.TOPIC_NAME,
+ JMSTopicService.JobType.WORKFLOW.getValue() + " = workflow,"
+ + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord, default = "
+ + JMSTopicService.TopicType.JOBID.getValue());
+ services.init();
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+ WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+ assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+ CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+ assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+ CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-for-action-input-check.xml", 0);
+ assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+ BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+ assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+ BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+ assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testMixedTopic2() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.getConf().set(
+ JMSTopicService.TOPIC_NAME,
+ JMSTopicService.JobType.WORKFLOW.getValue() + " = workflow,"
+ + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord");
+ services.init();
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+ WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
+ assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+ CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+ assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+ CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-for-action-input-check.xml", 0);
+ assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+ BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
+ // As no default is specified, user will be considered as topic
+ assertEquals(bjb.getUser(), jmsTopicService.getTopic(bjb.getId()));
+ BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
+ assertEquals(bjb.getUser(), jmsTopicService.getTopic(bab.getBundleActionId()));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testIncorrectConfigurationJobType() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.getConf().set(JMSTopicService.TOPIC_NAME,
+ "InvalidJobType" + " = workflow," + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord");
+ services.init();
+ fail("Expected Service Exception");
+ }
+ catch (ServiceException se) {
+ assertTrue(se.getMessage().contains("Incorrect job type"));
+ }
+ }
+
+ @Test
+ public void testIncorrectConfigurationDefault() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.getConf().set(JMSTopicService.TOPIC_NAME, "default=" + "invalidvalue");
+ services.init();
+ fail("Expected Service Exception");
+ }
+ catch (ServiceException se) {
+ assertTrue(se.getMessage().contains("not allowed in default"));
+ }
+ }
+
+}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,11 +30,13 @@ import org.apache.oozie.DagEngineExcepti
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
import org.apache.oozie.client.rest.JsonWorkflowAction;
import org.apache.oozie.client.rest.JsonWorkflowJob;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.service.DagEngineService;
import org.apache.oozie.util.XmlUtils;
+import org.json.simple.JSONValue;
public class MockDagEngineService extends DagEngineService {
public static final String JOB_ID = "job-";
@@ -176,6 +178,12 @@ public class MockDagEngineService extend
}
@Override
+ public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
+ did = RestConstants.JOB_SHOW_JMS_INFO;
+ return createDummyJMSConnectionInfo();
+ }
+
+ @Override
public String getDefinition(String jobId) throws DagEngineException {
did = RestConstants.JOB_SHOW_DEFINITION;
int idx = validateWorkflowIdx(jobId);
@@ -227,6 +235,16 @@ public class MockDagEngineService extend
}
}
+ private static JMSConnectionInfoBean createDummyJMSConnectionInfo() {
+ JMSConnectionInfoBean jmsBean = new JMSConnectionInfoBean();
+ Properties jmsProps = new Properties();
+ jmsProps.setProperty("k1", "v1");
+ jmsProps.setProperty("k2", "v2");
+ jmsBean.setJNDIProperties(jmsProps);
+ jmsBean.setTopicName("topic");
+ return jmsBean;
+ }
+
private static WorkflowJob createDummyWorkflow(int idx) {
JsonWorkflowJob workflow = new JsonWorkflowJob();
workflow.setId(JOB_ID + idx + JOB_ID_END);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java Wed Apr 17 23:55:20 2013
@@ -337,4 +337,22 @@ public class TestV1JobServlet extends Da
}
});
}
+
+ public void testJMSInfo() throws Exception {
+ runTest("/v1/job/*", V1JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockDagEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_JMS_INFO);
+ URL url = createURL(MockDagEngineService.JOB_ID + 1 + MockDagEngineService.JOB_ID_END, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+ assertEquals(RestConstants.JOB_SHOW_JMS_INFO, MockDagEngineService.did);
+ return null;
+ }
+ });
+ }
+
}
Modified: oozie/trunk/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/pom.xml?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/pom.xml (original)
+++ oozie/trunk/pom.xml Wed Apr 17 23:55:20 2013
@@ -272,6 +272,18 @@
<version>1.6.6</version>
</dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.8.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>1.8.8</version>
+ </dependency>
+
<!-- core -->
<dependency>
<groupId>org.apache.oozie</groupId>
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Apr 17 23:55:20 2013
@@ -1,5 +1,7 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1235 Client API for retrieving topic and jms connection related details (virag)
+OOZIE-1234 JMS Event Listeners for publishing notifications related to workflow and coordinator (virag)
OOZIE-1281 Hiveaction should populate externalChildIDs (rohini via virag)
OOZIE-1322 show child job URL tab selectively for pig action (ryota via mona)
OOZIE-1307 Cover package org.apache.oozie.action.ssh with unit tests (vbondarev via rkanter)