You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/05/08 19:59:53 UTC
svn commit: r1480380 [2/2] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/cli/
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/event/
client/src/main/java/org/apache/oozie/client/event/jms/ client...
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java Wed May 8 17:59:52 2013
@@ -17,10 +17,16 @@
*/
package org.apache.oozie.servlet;
+import java.io.IOException;
+
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.oozie.DagEngine;
+import org.apache.oozie.DagEngineException;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.service.DagEngineService;
+import org.apache.oozie.service.Services;
@SuppressWarnings("serial")
public class V2JobServlet extends V1JobServlet {
@@ -42,4 +48,21 @@ public class V2JobServlet extends V1JobS
JsonBean actionBean = super.getWorkflowActionBean(request, response);
return actionBean;
}
+
+
+ @Override
+ protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ String topicName;
+ String jobId = getResourceName(request);
+ DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
+ getAuthToken(request));
+ try {
+ topicName = dagEngine.getJMSTopicName(jobId);
+ }
+ catch (DagEngineException ex) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+ }
+ return topicName;
+ }
}
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Wed May 8 17:59:52 2013
@@ -164,14 +164,14 @@
bundle job and bundle action
WORKFLOW=workflow,
COORDINATOR=coordinator,
- BUNDLE={jobId}
+ BUNDLE=${jobId}
For jobs with no defined topic, default topic will be ${username}
</description>
</property>
<!-- JMS Producer connection -->
<property>
- <name>oozie.jms.producer.notification.connection</name>
+ <name>oozie.jms.producer.connection.properties</name>
<value>java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616;connectionFactoryNames#ConnectionFactory</value>
</property>
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java Wed May 8 17:59:52 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,6 +34,7 @@ import org.apache.oozie.servlet.MockDagE
import org.apache.oozie.servlet.V1AdminServlet;
import org.apache.oozie.servlet.V1JobServlet;
import org.apache.oozie.servlet.V1JobsServlet;
+import org.apache.oozie.servlet.V2AdminServlet;
import org.apache.oozie.servlet.V2JobServlet;
import org.apache.oozie.util.XConfiguration;
@@ -45,15 +46,15 @@ public class TestOozieCLI extends DagSer
new V1JobServlet();
new V1JobsServlet();
new V1AdminServlet();
+ new V2AdminServlet();
new V2JobServlet();
}
static final boolean IS_SECURITY_ENABLED = false;
static final String VERSION = "/v" + OozieClient.WS_PROTOCOL_VERSION;
static final String[] END_POINTS = {"/versions", VERSION + "/jobs", VERSION + "/job/*", VERSION + "/admin/*"};
- static final Class[] SERVLET_CLASSES =
- { HeaderTestingVersionServlet.class, V1JobsServlet.class, V1JobServlet.class,
- V1AdminServlet.class, V2JobServlet.class };
+ static final Class[] SERVLET_CLASSES = { HeaderTestingVersionServlet.class, V1JobsServlet.class,
+ V1JobServlet.class, V1AdminServlet.class, V2JobServlet.class, V2AdminServlet.class };
@Override
protected void setUp() throws Exception {
@@ -300,7 +301,7 @@ public class TestOozieCLI extends DagSer
/**
* Check if "-debug" option is accepted at CLI with job run command
- *
+ *
* @throws Exception
*/
public void testRunWithDebug() throws Exception {
@@ -518,12 +519,12 @@ public class TestOozieCLI extends DagSer
MockDagEngineService.JOB_ID + "1" + MockDagEngineService.JOB_ID_END};
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_SHOW_INFO, MockDagEngineService.did);
-
+
args = new String[]{"job", "-oozie", oozieUrl, "-info", MockDagEngineService.JOB_ID + "2" +
MockDagEngineService.JOB_ID_END};
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_SHOW_INFO, MockDagEngineService.did);
-
+
args = new String[]{"job", "-oozie", oozieUrl, "-info",
MockDagEngineService.JOB_ID + (MockDagEngineService.workflows.size() + 1)};
assertEquals(-1, new OozieCLI().run(args));
@@ -545,12 +546,12 @@ public class TestOozieCLI extends DagSer
"name=x"};
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did);
-
+
args = new String[]{"jobs", "-timezone", "PST", "-len", "3", "-offset", "2", "-oozie", oozieUrl,
"-filter", "name=x"};
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did);
-
+
args = new String[]{"jobs", "-jobtype", "coord", "-filter", "status=FAILED", "-oozie", oozieUrl};
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOBS_FILTER_PARAM, MockDagEngineService.did);
@@ -711,7 +712,7 @@ public class TestOozieCLI extends DagSer
}
});
}
-
+
public void testInfo() throws Exception {
String[] args = new String[]{"info"};
assertEquals(0, new OozieCLI().run(args));
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java Wed May 8 17:59:52 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,6 +32,7 @@ import org.apache.oozie.servlet.V0JobsSe
import org.apache.oozie.servlet.V1AdminServlet;
import org.apache.oozie.servlet.V1JobServlet;
import org.apache.oozie.servlet.V1JobsServlet;
+import org.apache.oozie.servlet.V2AdminServlet;
import org.apache.oozie.servlet.V2JobServlet;
public class TestWorkflowClient extends DagServletTestCase {
@@ -44,13 +45,15 @@ public class TestWorkflowClient extends
new V1AdminServlet();
new V1JobServlet();
new V2JobServlet();
+ new V2AdminServlet();
}
private static final boolean IS_SECURITY_ENABLED = false;
static final String VERSION = "/v" + OozieClient.WS_PROTOCOL_VERSION;
static final String[] END_POINTS = {"/versions", VERSION + "/jobs", VERSION + "/job/*", VERSION + "/admin/*"};
- static final Class[] SERVLET_CLASSES = {HeaderTestingVersionServlet.class, V0JobsServlet.class,
- V0JobServlet.class, V1AdminServlet.class, V1JobServlet.class, V2JobServlet.class, V1JobsServlet.class};
+ static final Class[] SERVLET_CLASSES = { HeaderTestingVersionServlet.class, V0JobsServlet.class,
+ V0JobServlet.class, V1AdminServlet.class, V2AdminServlet.class, V1JobServlet.class, V2JobServlet.class,
+ V1JobsServlet.class };
protected void setUp() throws Exception {
super.setUp();
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java Wed May 8 17:59:52 2013
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import java.util.Properties;
-
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.jms.JMSJobEventListener;
-import org.apache.oozie.service.JMSAccessorService;
-import org.apache.oozie.service.JMSTopicService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.workflow.WorkflowInstance;
-import org.junit.Test;
-
-public class TestJMSInfoXCommand extends XDataTestCase {
-
- private Services services;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- services = new Services();
- services.init();
- }
-
- @Override
- protected void tearDown() throws Exception {
- services.destroy();
- super.tearDown();
- }
-
- @Test
- public void testConnectionNotEnabled() {
- try {
- WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
- new JMSInfoXCommand(wfj.getId()).call();
- }
- catch (CommandException e) {
- assertEquals(ErrorCode.E1601, e.getErrorCode());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testJMSConnectionInfo() {
- try {
- services.destroy();
- services = new Services();
- services.getConf().set(
- JMSJobEventListener.JMS_CONNECTION_PROPERTIES,
- "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#"
- + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory");
- services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
- services.init();
-
- WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
- JMSInfoXCommand jmsInfoCmd = new JMSInfoXCommand(wfj.getId());
- JMSConnectionInfoBean jmsBean = jmsInfoCmd.call();
- assertEquals("test", jmsBean.getTopicName());
- Properties props = jmsBean.getJNDIProperties();
- assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial"));
- assertEquals(localActiveMQBroker, props.get("java.naming.provider.url"));
- assertEquals("ConnectionFactory", props.get("connectionFactoryNames"));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java Wed May 8 17:59:52 2013
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.jms;
+import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.hadoop.conf.Configuration;
@@ -54,11 +55,11 @@ public class TestDefaultConnectionContex
}
@Test
- public void testThreadLocalSession() {
+ public void testThreadLocalSession() throws JMSException {
String jmsProps = services.getConf().get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
ConnectionContext jmsContext = Services.get().get(JMSAccessorService.class)
- .createConnectionContext(connInfo, false);
+ .createConnectionContext(connInfo);
Thread th = new Thread(new SessionThread(jmsContext));
th.start();
try {
@@ -69,25 +70,17 @@ public class TestDefaultConnectionContex
}
assertEquals(session1, session2);
- ThreadLocal<Session> threadLocal1 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
- Session session3 = threadLocal1.get();
- ThreadLocal<Session> threadLocal2 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
- Session session4 = threadLocal2.get();
+ Session session3 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
+ Session session4 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
// As session3 and session4 are created by same threads, they should be
// equal
assertTrue(session3.equals(session4));
// As session1 and session3 are created by diff threads, they shoudn't
// be equal
assertFalse(session1.equals(session3));
- threadLocal1.remove();
- ThreadLocal<Session> threadLocal3 = jmsContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
- Session session5 = threadLocal3.get();
- assertFalse(session3.equals(session5));
-
}
class SessionThread implements Runnable {
-
private ConnectionContext connContext;
SessionThread(ConnectionContext connContext) {
@@ -96,10 +89,14 @@ public class TestDefaultConnectionContex
@Override
public void run() {
- ThreadLocal<Session> th1 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
- session1 = th1.get();
- ThreadLocal<Session> th2 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
- session2 = th2.get();
+ try {
+ session1 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
+ session2 = connContext.createThreadLocalSession(Session.AUTO_ACKNOWLEDGE);
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+
}
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java Wed May 8 17:59:52 2013
@@ -89,7 +89,7 @@ public class TestJMSJobEventListener ext
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("endTime"));
- WorkflowJobMessage wfStartMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
assertEquals(startDate, wfStartMessage.getStartTime());
assertEquals("wfId1", wfStartMessage.getId());
@@ -122,7 +122,7 @@ public class TestJMSJobEventListener ext
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfSuccMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
assertEquals(startDate, wfSuccMessage.getStartTime());
assertEquals(endDate, wfSuccMessage.getEndTime());
@@ -158,7 +158,7 @@ public class TestJMSJobEventListener ext
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals(startDate, wfFailMessage.getStartTime());
assertEquals(endDate, wfFailMessage.getEndTime());
@@ -193,7 +193,7 @@ public class TestJMSJobEventListener ext
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("endTime"));
- WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
assertEquals(startDate, wfFailMessage.getStartTime());
assertEquals("wfId1", wfFailMessage.getId());
@@ -226,7 +226,7 @@ public class TestJMSJobEventListener ext
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals("user_1", wfFailMessage.getUser());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
@@ -276,7 +276,7 @@ public class TestJMSJobEventListener ext
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals("user1", wfFailMessage.getUser());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
@@ -303,7 +303,7 @@ public class TestJMSJobEventListener ext
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = (WorkflowJobMessage) JMSMessagingUtils.getEventMessage(message);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals("user1", wfFailMessage.getUser());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
@@ -339,8 +339,7 @@ public class TestJMSJobEventListener ext
assertNotNull(jmsContext);
broker.stop();
jmsContext = getConnectionContext();
- // Exception Listener should have removed the conn context from
- // connection map
+ // Exception Listener should have removed the old conn context
assertNull(jmsContext);
broker = new BrokerService();
broker.addConnector(brokerURl);
@@ -381,7 +380,7 @@ public class TestJMSJobEventListener ext
assertFalse(message.getText().contains("endTime"));
assertFalse(message.getText().contains("errorCode"));
assertFalse(message.getText().contains("errorMessage"));
- CoordinatorActionMessage coordActionWaitingMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
assertEquals(startDate, coordActionWaitingMessage.getStartTime());
@@ -419,7 +418,7 @@ public class TestJMSJobEventListener ext
assertFalse(message.getText().contains("errorCode"));
assertFalse(message.getText().contains("errorMessage"));
assertFalse(message.getText().contains("missingDependency"));
- CoordinatorActionMessage coordActionStartMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
assertEquals(startDate, coordActionStartMessage.getStartTime());
@@ -456,7 +455,7 @@ public class TestJMSJobEventListener ext
assertFalse(message.getText().contains("errorCode"));
assertFalse(message.getText().contains("errorMessage"));
assertFalse(message.getText().contains("missingDependency"));
- CoordinatorActionMessage coordActionSuccessMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
assertEquals(startDate, coordActionSuccessMessage.getStartTime());
@@ -494,7 +493,7 @@ public class TestJMSJobEventListener ext
coordEventListener.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("missingDependency"));
- CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
assertEquals(startDate, coordActionFailMessage.getStartTime());
@@ -530,7 +529,7 @@ public class TestJMSJobEventListener ext
MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
coordEventListener.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
- CoordinatorActionMessage coordActionFailMessage = (CoordinatorActionMessage) JMSMessagingUtils
+ CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
.getEventMessage(message);
Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
assertEquals("user1", coordActionFailMessage.getUser());
@@ -571,7 +570,7 @@ public class TestJMSJobEventListener ext
String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
- ConnectionContext jmsContext = jmsService.createConnectionContext(connInfo, false);
+ ConnectionContext jmsContext = jmsService.createProducerConnectionContext(connInfo);
return jmsContext;
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java Wed May 8 17:59:52 2013
@@ -1,87 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.jms;
-
-import java.util.Properties;
-
-import org.apache.oozie.BundleActionBean;
-import org.apache.oozie.BundleJobBean;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.service.JMSAccessorService;
-import org.apache.oozie.service.JMSTopicService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.workflow.WorkflowInstance;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestJMSServerInfo extends XDataTestCase {
-
- private Services services;
-
- @Before
- protected void setUp() throws Exception {
- super.setUp();
- services = new Services();
- services.getConf().set(
- JMSJobEventListener.JMS_CONNECTION_PROPERTIES,
- "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + "java.naming.provider.url#"
- + localActiveMQBroker + ";" + "connectionFactoryNames#" + "ConnectionFactory");
- services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, JMSTopicService.class.getName());
- services.init();
- }
-
- @After
- protected void tearDown() throws Exception {
- services.destroy();
- super.tearDown();
- }
-
- @Test
- public void testJMSConnectionInfo() {
- try {
- JMSServerInfo jmsServerInfo = new DefaultJMSServerInfo();
- String connectionProperties = Services.get().getConf()
- .get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
- WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
- JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, wfj.getId());
- assertEquals(wfj.getUser(), jmsBean.getTopicName());
- Properties props = jmsBean.getJNDIProperties();
- assertEquals(ActiveMQConnFactory, props.get("java.naming.factory.initial"));
- assertEquals(localActiveMQBroker, props.get("java.naming.provider.url"));
- assertEquals("ConnectionFactory", props.get("connectionFactoryNames"));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Wed May 8 17:59:52 2013
@@ -56,10 +56,10 @@ public class TestJMSAccessorService exte
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
// both servers should connect to default JMS server
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
- ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo, true);
+ ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo);
assertTrue(ctxt1.isConnectionInitialized());
JMSConnectionInfo connInfo1 = hcatService.getJMSConnectionInfo(new URI("http://unknown:80"));
- ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1, true);
+ ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1);
assertTrue(ctxt2.isConnectionInitialized());
assertEquals(ctxt1, ctxt2);
ctxt1.close();
@@ -213,13 +213,13 @@ public class TestJMSAccessorService exte
assertTrue(jmsService.isListeningToTopic(connInfo, topic));
assertFalse(jmsService.isConnectionInRetryList(connInfo));
assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo, true);
+ ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
broker.stop();
try {
connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
fail("Exception expected");
}
- catch (JMSException e) {
+ catch (Exception e) {
Thread.sleep(100);
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
assertTrue(jmsService.isConnectionInRetryList(connInfo));
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java Wed May 8 17:59:52 2013
@@ -18,6 +18,8 @@
package org.apache.oozie.service;
+import java.util.Properties;
+
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
@@ -29,6 +31,8 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.Job;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.service.JMSTopicService.JobType;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.workflow.WorkflowInstance;
@@ -87,24 +91,27 @@ public class TestJMSTopicService extends
@Test
public void testTopicAsJobId() {
try {
+ final String TOPIC_PREFIX = "oozie.";
services.destroy();
services = setupServicesForTopic();
services.getConf().set(JMSTopicService.TOPIC_NAME, "default=" + JMSTopicService.TopicType.JOBID.getValue());
+ services.getConf().set(JMSTopicService.TOPIC_PREFIX, TOPIC_PREFIX);
services.init();
JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
- assertEquals(wfj.getId(), jmsTopicService.getTopic(wfj.getId()));
+ assertEquals(TOPIC_PREFIX, jmsTopicService.getTopicPrefix());
+ assertEquals(TOPIC_PREFIX+wfj.getId(), jmsTopicService.getTopic(wfj.getId()));
WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
- assertEquals(wfj.getId(), jmsTopicService.getTopic(wab.getId()));
+ assertEquals(TOPIC_PREFIX+wfj.getId(), jmsTopicService.getTopic(wab.getId()));
CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
- assertEquals(cjb.getId(), jmsTopicService.getTopic(cjb.getId()));
+ assertEquals(TOPIC_PREFIX+cjb.getId(), jmsTopicService.getTopic(cjb.getId()));
CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
"coord-action-for-action-input-check.xml", 0);
- assertEquals(cjb.getId(), jmsTopicService.getTopic(cab.getId()));
+ assertEquals(TOPIC_PREFIX+cjb.getId(), jmsTopicService.getTopic(cab.getId()));
BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
- assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+ assertEquals(TOPIC_PREFIX+bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
- assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+ assertEquals(TOPIC_PREFIX+bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
}
catch (Exception e) {
e.printStackTrace();
@@ -241,4 +248,56 @@ public class TestJMSTopicService extends
}
}
+ @Test
+ public void testTopicProperties1() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.init();
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ Properties props = jmsTopicService.getTopicPatternProperties();
+ assertEquals("${username}", props.get(AppType.WORKFLOW_JOB));
+ assertEquals("${username}", props.get(AppType.WORKFLOW_ACTION));
+ assertEquals("${username}", props.get(AppType.COORDINATOR_JOB));
+ assertEquals("${username}", props.get(AppType.COORDINATOR_ACTION));
+ assertEquals("${username}", props.get(AppType.BUNDLE_JOB));
+ assertEquals("${username}", props.get(AppType.BUNDLE_ACTION));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ @Test
+ public void testTopicProperties2() {
+ try {
+ services.destroy();
+ services = setupServicesForTopic();
+ services.getConf().set(
+ JMSTopicService.TOPIC_NAME,
+ JMSTopicService.JobType.WORKFLOW.getValue() + " = workflow,"
+ + JMSTopicService.JobType.COORDINATOR.getValue() + "=coord");
+ services.init();
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ Properties props = jmsTopicService.getTopicPatternProperties();
+ assertEquals("workflow", props.get(AppType.WORKFLOW_JOB));
+ assertEquals("workflow", props.get(AppType.WORKFLOW_ACTION));
+
+ assertEquals("coord", props.get(AppType.COORDINATOR_JOB));
+ assertEquals("coord", props.get(AppType.COORDINATOR_ACTION));
+
+ assertEquals("${username}", props.get(AppType.BUNDLE_JOB));
+ assertEquals("${username}", props.get(AppType.BUNDLE_ACTION));
+
+ services.destroy();
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java Wed May 8 17:59:52 2013
@@ -178,12 +178,6 @@ public class MockDagEngineService extend
}
@Override
- public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
- did = RestConstants.JOB_SHOW_JMS_INFO;
- return createDummyJMSConnectionInfo();
- }
-
- @Override
public String getDefinition(String jobId) throws DagEngineException {
did = RestConstants.JOB_SHOW_DEFINITION;
int idx = validateWorkflowIdx(jobId);
@@ -241,7 +235,7 @@ public class MockDagEngineService extend
jmsProps.setProperty("k1", "v1");
jmsProps.setProperty("k2", "v2");
jmsBean.setJNDIProperties(jmsProps);
- jmsBean.setTopicName("topic");
+ jmsBean.setTopicPrefix("topicPrefix");
return jmsBean;
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java Wed May 8 17:59:52 2013
@@ -337,22 +337,4 @@ public class TestV1JobServlet extends Da
}
});
}
-
- public void testJMSInfo() throws Exception {
- runTest("/v1/job/*", V1JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- MockDagEngineService.reset();
- Map<String, String> params = new HashMap<String, String>();
- params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_JMS_INFO);
- URL url = createURL(MockDagEngineService.JOB_ID + 1 + MockDagEngineService.JOB_ID_END, params);
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setRequestMethod("GET");
- assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
- assertEquals(RestConstants.JOB_SHOW_JMS_INFO, MockDagEngineService.did);
- return null;
- }
- });
- }
-
}
Modified: oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki Wed May 8 17:59:52 2013
@@ -910,14 +910,32 @@ Assuming Oozie is runing at =OOZIE_URL=,
* <OOZIE_URL>/v2/job
* <OOZIE_URL>/v2/jobs
-Please note that v1 and v2 are almost identical.
-Only difference is the JSON format of Job Information API (*/job) particularly for map-reduce action.
-No change for other actions.
+*Changes in v2 job API:*
+There is a difference in the JSON format of Job Information API (*/job) particularly for map-reduce action.
+No change for other actions.
In v1, externalId and consoleUrl point to spawned child job ID, and exteranlChildIDs is null in map-reduce action.
In v2, externalId and consoleUrl point to launcher job ID, and exteranlChildIDs is spawned child job ID in map-reduce action.
-v2/admin, v2/jobs remain the same with v1/admin, v1/jobs
+v2 supports retrieving of JMS topic on which job notifications are sent
+
+*REST API URL:*
+
+<verbatim>
+GET http://localhost:11000/oozie/v2/job/0000002-130507145349661-oozie-vira-W?show=jmstopic
+</verbatim>
+
+*Changes in v2 admin API:*
+
+v2 adds support for retrieving JMS connection information related to JMS notifications.
+
+*REST API URL:*
+
+<verbatim>
+GET http://localhost:11000/oozie/v2/admin/jmsinfo
+</verbatim>
+
+v2/jobs remain the same as v1/jobs
---+++ Job and Jobs End-Points
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed May 8 17:59:52 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1347 Additions to JMS topic API (virag)
OOZIE-1231 Provide access to launcher job URL from web console when using Map Reduce action (ryota via virag)
OOZIE-1335 The launcher job should use uber mode in Hadoop 2 by default (rkanter)
OOZIE-1297 Add chgrp in FS action (ryota via virag)
Modified: oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml (original)
+++ oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml Wed May 8 17:59:52 2013
@@ -49,6 +49,13 @@
</servlet>
<servlet>
+ <servlet-name>v2admin</servlet-name>
+ <display-name>Oozie admin</display-name>
+ <servlet-class>org.apache.oozie.servlet.V2AdminServlet</servlet-class>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
+ <servlet>
<servlet-name>callback</servlet-name>
<display-name>Callback Notification</display-name>
<servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class>
@@ -114,7 +121,7 @@
</servlet-mapping>
<servlet-mapping>
- <servlet-name>v1admin</servlet-name>
+ <servlet-name>v2admin</servlet-name>
<url-pattern>/v2/admin/*</url-pattern>
</servlet-mapping>