You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/05/08 19:59:53 UTC
svn commit: r1480380 [1/2] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/cli/
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/event/
client/src/main/java/org/apache/oozie/client/event/jms/ client...
Author: virag
Date: Wed May 8 17:59:52 2013
New Revision: 1480380
URL: http://svn.apache.org/r1480380
Log:
OOZIE-1347 Additions to JMS topic API (virag)
Added:
oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java
Modified:
oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
oozie/trunk/core/src/test/java/org/apache/oozie/client/TestWorkflowClient.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
oozie/trunk/release-log.txt
oozie/trunk/webapp/src/main/webapp/WEB-INF/web.xml
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Wed May 8 17:59:52 2013
@@ -111,7 +111,6 @@ public class OozieCLI {
public static final String ACTION_OPTION = "action";
public static final String DEFINITION_OPTION = "definition";
public static final String CONFIG_CONTENT_OPTION = "configcontent";
- public static final String JMS_INFO_OPTION = "jmsinfo";
public static final String DO_AS_OPTION = "doas";
@@ -255,7 +254,6 @@ public class OozieCLI {
Option changeValue = new Option(CHANGE_VALUE_OPTION, true,
"new endtime/concurrency/pausetime value for changing a coordinator job");
Option info = new Option(INFO_OPTION, true, "info of a job");
- Option jmsInfo = new Option (JMS_INFO_OPTION, true, "JMS Topic name and JNDI connection string for a job");
Option offset = new Option(OFFSET_OPTION, true, "job info offset of actions (default '1', requires -info)");
Option len = new Option(LEN_OPTION, true, "number of actions (default TOTAL ACTIONS, requires -info)");
Option filter = new Option(FILTER_OPTION, true,
@@ -292,7 +290,6 @@ public class OozieCLI {
actions.addOption(kill);
actions.addOption(change);
actions.addOption(info);
- actions.addOption(jmsInfo);
actions.addOption(rerun);
actions.addOption(log);
actions.addOption(definition);
@@ -751,9 +748,6 @@ public class OozieCLI {
}
try {
- if (options.contains(JMS_INFO_OPTION)) {
- printJMSInfo(wc.getJMSConnectionInfo(commandLine.getOptionValue(JMS_INFO_OPTION)));
- }
if (options.contains(SUBMIT_OPTION)) {
System.out.println(JOB_ID_PREFIX + wc.submit(getConfiguration(wc, commandLine)));
}
@@ -1084,10 +1078,6 @@ public class OozieCLI {
}
}
- private void printJMSInfo (JMSConnectionInfo jmsInfo) {
- System.out.println("Topic Name : " + jmsInfo.getTopicName());
- System.out.println("JNDI Properties : " + jmsInfo.getJNDIProperties());
- }
private void printWorkflowAction(WorkflowAction action, String timeZoneId, boolean verbose) {
System.out.println("ID : " + maskIfNull(action.getId()));
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java Wed May 8 17:59:52 2013
@@ -19,6 +19,8 @@ package org.apache.oozie.client;
import java.util.Properties;
+import org.apache.oozie.client.event.Event;
+
/**
* JMS connection related information
*
@@ -26,10 +28,17 @@ import java.util.Properties;
public interface JMSConnectionInfo {
/**
- * Retrieve the JMS topic name
- * @return the topic name
+ * Get the topic prefix for a JMS topic
+ * @return JMS topic prefix
+ */
+ String getTopicPrefix();
+
+ /**
+ * Get the topic pattern given the app type of job
+ * @param appType the appType for a job
+ * @return JMS topic pattern
*/
- String getTopicName();
+ String getTopicPattern(Event.AppType appType);
/**
* Retrieve the JNDI properties for establishing connection to JMS server
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java?rev=1480380&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfoWrapper.java Wed May 8 17:59:52 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client;
+
+import java.util.Properties;
+
+/**
+ * Wrapper for JMSConnectionInfo
+ *
+ */
+public interface JMSConnectionInfoWrapper {
+
+ /**
+ * Retrieve JNDI properties
+ * @return JNDI properties
+ */
+ Properties getJNDIProperties();
+
+ /**
+ * Get topic prefix
+ * @return the topic prefix
+ */
+ String getTopicPrefix();
+
+ /**
+ * Get topic pattern as Properties
+ * @return TopicPattern properties
+ */
+ Properties getTopicPatternProperties();
+}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed May 8 17:59:52 2013
@@ -257,7 +257,7 @@ public class OozieClient {
if (!array.contains(WS_PROTOCOL_VERSION) && !array.contains(WS_PROTOCOL_VERSION_1)
&& !array.contains(WS_PROTOCOL_VERSION_0)) {
StringBuilder msg = new StringBuilder();
- msg.append("Supported version [").append(WS_PROTOCOL_VERSION_1).append(
+ msg.append("Supported version [").append(WS_PROTOCOL_VERSION).append(
"] or less, Unsupported versions[");
String separator = "";
for (Object version : array) {
@@ -681,9 +681,8 @@ public class OozieClient {
private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
- JMSInfo(String jobId) {
- super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
- RestConstants.JOB_SHOW_JMS_INFO));
+ JMSInfo() {
+ super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams());
}
protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
@@ -732,12 +731,11 @@ public class OozieClient {
/**
* Get the JMS Connection info
- * @param jobId
* @return JMSConnectionInfo object
* @throws OozieClientException
*/
- public JMSConnectionInfo getJMSConnectionInfo(String jobId) throws OozieClientException {
- return new JMSInfo(jobId).call();
+ public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException {
+ return new JMSInfo().call();
}
/**
@@ -800,6 +798,36 @@ public class OozieClient {
}
/**
+ * Gets the JMS topic name for a particular job
+ * @param jobId given jobId
+ * @return the JMS topic name
+ * @throws OozieClientException
+ */
+ public String getJMSTopicName(String jobId) throws OozieClientException {
+ return new JMSTopic(jobId).call();
+ }
+
+ private class JMSTopic extends ClientCallable<String> {
+
+ JMSTopic(String jobId) {
+ super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
+ RestConstants.JOB_SHOW_JMS_TOPIC));
+ }
+
+ protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
+ if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+ Reader reader = new InputStreamReader(conn.getInputStream());
+ JSONObject json = (JSONObject) JSONValue.parse(reader);
+ return (String) json.get(JsonTags.JMS_TOPIC_NAME);
+ }
+ else {
+ handleError(conn);
+ }
+ return null;
+ }
+ }
+
+ /**
* Get the definition of a workflow job.
*
* @param jobId job Id.
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/Event.java Wed May 8 17:59:52 2013
@@ -41,7 +41,8 @@ public interface Event {
WORKFLOW_ACTION,
COORDINATOR_JOB,
COORDINATOR_ACTION,
- BUNDLE_JOB
+ BUNDLE_JOB,
+ BUNDLE_ACTION
}
/**
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java Wed May 8 17:59:52 2013
@@ -23,11 +23,11 @@ package org.apache.oozie.client.event.jm
*/
public final class JMSHeaderConstants {
// JMS Application specific properties for selectors
- public static final String EVENT_STATUS = "EVENT_STATUS";
- public static final String APP_NAME = "APP_NAME";
- public static final String USER = "USER";
- public static final String MESSAGE_TYPE = "MESSAGE_TYPE";
- public static final String APP_TYPE = "APP_TYPE";
+ public static final String EVENT_STATUS = "eventStatus";
+ public static final String APP_NAME = "appName";
+ public static final String USER = "user";
+ public static final String MESSAGE_TYPE = "msgType";
+ public static final String APP_TYPE = "appType";
// JMS Header property
- public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+ public static final String MESSAGE_FORMAT = "msgFormat";
}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java Wed May 8 17:59:52 2013
@@ -31,40 +31,52 @@ import org.apache.oozie.client.event.mes
* Client utility to convert JMS message to EventMessage object
*/
public class JMSMessagingUtils {
-
private static final String DESERIALIZER_PROP = "oozie.msg.deserializer.";
+ private static MessageDeserializer deserializer;
+ private static Properties jmsDeserializerInfo;
+ private static final String CLIENT_PROPERTIES = "oozie_client.properties";
+
+ static {
+ InputStream is = JMSMessagingUtils.class.getClassLoader().getResourceAsStream(CLIENT_PROPERTIES);
+ if (is == null) {
+ System.out.println("Using default JSON Deserializer");
+ deserializer = new JSONMessageDeserializer();
+ }
+ else {
+ jmsDeserializerInfo = new Properties();
+ try {
+ jmsDeserializerInfo.load(is);
+ is.close();
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException("I/O error occured for " + CLIENT_PROPERTIES, ioe);
+ }
+ }
+
+ }
/**
* Constructs the EventMessage object from JMS message
*
+ * @param <T>
* @param msg the JMS message
* @return the EventMessage
* @throws IOException
* @throws JMSException
*/
- public static EventMessage getEventMessage(Message msg) throws IOException, JMSException {
+ public static <T extends EventMessage> T getEventMessage(Message msg) throws IOException, JMSException {
if (msg == null) {
throw new IllegalArgumentException("Could not extract EventMessage as JMS message is null");
}
- TextMessage textMessage = (TextMessage) msg;
- String msgFormat = msg.getStringProperty(JMSHeaderConstants.MESSAGE_FORMAT);
-
- Properties jmsDeserializerInfo = new Properties();
- InputStream is = JMSMessagingUtils.class.getClassLoader().getResourceAsStream("oozie_client.properties");
-
- if (is == null) {
- System.out.println("Using default deserializer");
- return new JSONMessageDeserializer().getEventMessage(textMessage);
- }
- else {
- jmsDeserializerInfo.load(is);
- MessageDeserializer deserializer = getDeserializer((String) jmsDeserializerInfo.get(DESERIALIZER_PROP
- + msgFormat));
- return deserializer.getEventMessage(textMessage);
+ if (deserializer == null) {
+ String msgFormat = msg.getStringProperty(JMSHeaderConstants.MESSAGE_FORMAT);
+ deserializer = getDeserializer(msgFormat);
}
+ return deserializer.getEventMessage(msg);
}
- private static MessageDeserializer getDeserializer(String deserializerString) {
+ private static MessageDeserializer getDeserializer(String msgFormat) throws IOException {
+ String deserializerString = (String) jmsDeserializerInfo.get(DESERIALIZER_PROP + msgFormat);
if (deserializerString == null) {
return new JSONMessageDeserializer();
}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java Wed May 8 17:59:52 2013
@@ -19,18 +19,29 @@ package org.apache.oozie.client.event.jm
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.TextMessage;
+import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.Event.AppType;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.Event.MessageType;
import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.EventMessage;
import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
/**
* Message deserializer to convert from JSON to java object
*/
public class JSONMessageDeserializer extends MessageDeserializer {
+ static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+ static {
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
@Override
public <T> T getDeserializedObject(String messageBody, Class<T> clazz) {
try {
@@ -42,8 +53,9 @@ public class JSONMessageDeserializer ext
}
}
+ @SuppressWarnings("unchecked")
@Override
- public WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage workflowJobMsg, Message message)
+ public WorkflowJobMessage setProperties(WorkflowJobMessage workflowJobMsg, Message message)
throws JMSException {
workflowJobMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
workflowJobMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));
@@ -54,8 +66,9 @@ public class JSONMessageDeserializer ext
}
+ @SuppressWarnings("unchecked")
@Override
- public CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMsg, Message message)
+ public CoordinatorActionMessage setProperties(CoordinatorActionMessage coordActionMsg, Message message)
throws JMSException {
coordActionMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
coordActionMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java Wed May 8 17:59:52 2013
@@ -21,8 +21,6 @@ import org.apache.oozie.client.event.mes
import org.apache.oozie.client.event.message.EventMessage;
import org.apache.oozie.client.event.message.WorkflowJobMessage;
import org.apache.oozie.client.event.Event;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.DeserializationConfig;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;
@@ -31,11 +29,6 @@ import javax.jms.JMSException;
* Class to deserialize the jms message to java object
*/
public abstract class MessageDeserializer {
- static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
-
- static {
- mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
/**
* Constructs the event message from JMS message
@@ -44,9 +37,10 @@ public abstract class MessageDeserialize
* @return EventMessage
* @throws JMSException
*/
- public EventMessage getEventMessage(Message message) throws JMSException {
- String appTypeString = message.getStringProperty(JMSHeaderConstants.APP_TYPE);
- String messageBody = ((TextMessage) message).getText();
+ public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
+ TextMessage textMessage = (TextMessage) message;
+ String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
+ String messageBody = textMessage.getText();
if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
throw new IllegalArgumentException("Could not extract OozieEventMessage. "
@@ -56,11 +50,11 @@ public abstract class MessageDeserialize
switch (Event.AppType.valueOf(appTypeString)) {
case WORKFLOW_JOB:
WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
- return setPropertiesForObject(wfJobMsg, message);
+ return setProperties(wfJobMsg, textMessage);
case COORDINATOR_ACTION:
CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
CoordinatorActionMessage.class);
- return setPropertiesForObject(caActionMsg, message);
+ return setProperties(caActionMsg, textMessage);
default:
throw new UnsupportedOperationException("Conversion of " + appTypeString
+ " to Event message is not supported");
@@ -78,7 +72,7 @@ public abstract class MessageDeserialize
* @return WorkflowJobMessage
* @throws JMSException
*/
- public abstract WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage wfJobMessage, Message message)
+ protected abstract <T extends EventMessage> T setProperties(WorkflowJobMessage wfJobMessage, Message message)
throws JMSException;
/**
@@ -89,7 +83,7 @@ public abstract class MessageDeserialize
* @return CoordinatorActionMessage
* @throws JMSException
*/
- public abstract CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMessage,
+ protected abstract <T extends EventMessage> T setProperties(CoordinatorActionMessage coordActionMessage,
Message message) throws JMSException;
}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java Wed May 8 17:59:52 2013
@@ -185,7 +185,7 @@ public class JobMessage extends EventMes
*
* @param properties the jms selector key value pair
*/
- public void setMessageProperties(Map<String, String> properties) {
+ void setMessageProperties(Map<String, String> properties) {
jmsMessageProperties = properties;
}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Wed May 8 17:59:52 2013
@@ -196,7 +196,10 @@ public interface JsonTags {
public static final String TIME_ZOME_DISPLAY_NAME = "timezoneDisplayName";
public static final String TIME_ZONE_ID = "timezoneId";
- public static final String JMS_TOPIC_NAME = "jmsTopic";
+ public static final String JMS_TOPIC_PATTERN = "jmsTopicPattern";
public static final String JMS_JNDI_PROPERTIES = "jmsJNDIProps";
+ public static final String JMS_TOPIC_PREFIX = "jmsTopicPrefix";
+
+ public static final String JMS_TOPIC_NAME = "jmsTopicName";
}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Wed May 8 17:59:52 2013
@@ -22,8 +22,10 @@ import org.apache.oozie.client.BundleJob
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.JMSConnectionInfo;
+import org.apache.oozie.client.JMSConnectionInfoWrapper;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -184,8 +186,9 @@ public class JsonToBean {
BULK_RESPONSE.put("getCoordinator", new Property(JsonTags.BULK_RESPONSE_COORDINATOR, CoordinatorJob.class, true));
BULK_RESPONSE.put("getAction", new Property(JsonTags.BULK_RESPONSE_ACTION, CoordinatorAction.class, true));
- JMS_CONNECTION_INFO.put("getTopicName", new Property(JsonTags.JMS_TOPIC_NAME, String.class));
+ JMS_CONNECTION_INFO.put("getTopicPatternProperties", new Property(JsonTags.JMS_TOPIC_PATTERN, Properties.class));
JMS_CONNECTION_INFO.put("getJNDIProperties", new Property(JsonTags.JMS_JNDI_PROPERTIES, Properties.class));
+ JMS_CONNECTION_INFO.put("getTopicPrefix", new Property(JsonTags.JMS_TOPIC_PREFIX, String.class));
}
/**
@@ -365,8 +368,26 @@ public class JsonToBean {
* @return a coordinator job bean populated with the JSON object values.
*/
public static JMSConnectionInfo createJMSConnectionInfo(JSONObject json) {
- return (JMSConnectionInfo) Proxy.newProxyInstance(JsonToBean.class.getClassLoader(),
- new Class[] { JMSConnectionInfo.class }, new JsonInvocationHandler(JMS_CONNECTION_INFO, json));
+ final JMSConnectionInfoWrapper jmsInfo = (JMSConnectionInfoWrapper) Proxy.newProxyInstance(
+ JsonToBean.class.getClassLoader(), new Class[] { JMSConnectionInfoWrapper.class },
+ new JsonInvocationHandler(JMS_CONNECTION_INFO, json));
+
+ return new JMSConnectionInfo() {
+ @Override
+ public String getTopicPrefix() {
+ return jmsInfo.getTopicPrefix();
+ }
+
+ @Override
+ public String getTopicPattern(AppType appType) {
+ return (String)jmsInfo.getTopicPatternProperties().get(appType.name());
+ }
+
+ @Override
+ public Properties getJNDIProperties() {
+ return jmsInfo.getJNDIProperties();
+ }
+ };
}
/**
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java Wed May 8 17:59:52 2013
@@ -151,5 +151,7 @@ public interface RestConstants {
public static final String ADMIN_TIME_ZONES_RESOURCE = "available-timezones";
- public static final String JOB_SHOW_JMS_INFO = "jmsinfo";
+ public static final String ADMIN_JMS_INFO = "jmsinfo";
+
+ public static final String JOB_SHOW_JMS_TOPIC = "jmstopic";
}
Modified: oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java (original)
+++ oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java Wed May 8 17:59:52 2013
@@ -21,8 +21,10 @@ import junit.framework.TestCase;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.JMSConnectionInfo;
+import org.apache.oozie.client.JMSConnectionInfoWrapper;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -318,7 +320,12 @@ public class TestJsonToBean extends Test
private JSONObject createJMSInfoJSONObject(){
JSONObject json = new JSONObject();
- json.put(JsonTags.JMS_TOPIC_NAME, "topic");
+ json.put(JsonTags.JMS_TOPIC_PREFIX, "topicPrefix");
+ Properties topicProps = new Properties();
+ topicProps.put(Event.AppType.WORKFLOW_JOB, "wfTopic");
+ topicProps.put(Event.AppType.WORKFLOW_ACTION, "wfTopic");
+ topicProps.put(Event.AppType.COORDINATOR_ACTION, "coordTopic");
+ json.put(JsonTags.JMS_TOPIC_PATTERN, JSONValue.toJSONString(topicProps));
Properties props = new Properties();
props.put("k1", "v1");
props.put("k2", "v2");
@@ -329,10 +336,14 @@ public class TestJsonToBean extends Test
public void testParseJMSInfo() {
JSONObject json = createJMSInfoJSONObject();
JMSConnectionInfo jmsDetails = JsonToBean.createJMSConnectionInfo(json);
- assertEquals("topic", jmsDetails.getTopicName());
+ assertEquals("topicPrefix", jmsDetails.getTopicPrefix());
+ assertEquals("wfTopic", jmsDetails.getTopicPattern(Event.AppType.WORKFLOW_JOB));
+ assertEquals("wfTopic", jmsDetails.getTopicPattern(Event.AppType.WORKFLOW_ACTION));
+ assertEquals("coordTopic", jmsDetails.getTopicPattern(Event.AppType.COORDINATOR_ACTION));
Properties jmsProps = jmsDetails.getJNDIProperties();
assertNotNull(jmsDetails.getJNDIProperties());
assertEquals("v1", jmsProps.get("k1"));
+ assertEquals("v2", jmsProps.get("k2"));
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java Wed May 8 17:59:52 2013
@@ -23,9 +23,9 @@ import java.io.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.JMSInfoXCommand;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
public abstract class BaseEngine {
public static final String USE_XCOMMAND = "oozie.useXCommand";
@@ -197,19 +197,28 @@ public abstract class BaseEngine {
*/
public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
+
/**
- * Return the jms info about a job.
+ * Return the jms topic name for the job.
*
* @param jobId job Id.
- * @return the JMS info bean
+ * @return String the topic name
* @throws DagEngineException thrown if the jms info could not be obtained.
*/
- public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
- try {
- return new JMSInfoXCommand(jobId).call();
+ public String getJMSTopicName(String jobId) throws DagEngineException {
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ if (jmsTopicService != null) {
+ try {
+ return jmsTopicService.getTopic(jobId);
+ }
+ catch (JPAExecutorException e) {
+ throw new DagEngineException(ErrorCode.E1602, e);
+ }
}
- catch (CommandException ex) {
- throw new DagEngineException(ex);
+ else {
+ throw new DagEngineException(ErrorCode.E1602,
+ "JMSTopicService is not initialized. JMS notification"
+ + "may not be enabled");
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Wed May 8 17:59:52 2013
@@ -235,6 +235,7 @@ public enum ErrorCode {
E1501(XLog.STD, "Error in getting HCat Access [{0}]"),
E1601(XLog.STD, "Cannot retrieve JMS connection info [{0}]"),
+ E1602(XLog.STD, "Cannot retrieve Topic name [{0}]"),
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java Wed May 8 17:59:52 2013
@@ -19,17 +19,19 @@ package org.apache.oozie.client.rest;
import java.util.Properties;
-import org.apache.oozie.client.JMSConnectionInfo;
+import org.apache.oozie.client.JMSConnectionInfoWrapper;
+import org.apache.oozie.client.rest.JsonTags;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
/**
* JMS connection info bean representing the JMS related information for a job
*
*/
-public class JMSConnectionInfoBean implements JMSConnectionInfo, JsonBean {
+public class JMSConnectionInfoBean implements JsonBean, JMSConnectionInfoWrapper {
private Properties JNDIProperties;
- private String topicName;
+ private String topicPrefix;
+ private Properties topicProperties;
@Override
@@ -38,14 +40,6 @@ public class JMSConnectionInfoBean imple
}
/**
- * Set the topic name
- * @param topicName
- */
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
- /**
* Set the JNDI properties for jms connection
* @param JNDIProperties
*/
@@ -53,12 +47,6 @@ public class JMSConnectionInfoBean imple
this.JNDIProperties = JNDIProperties;
}
- @Override
- public String getTopicName() {
- return topicName;
- }
-
- @Override
public Properties getJNDIProperties() {
return JNDIProperties;
}
@@ -68,8 +56,35 @@ public class JMSConnectionInfoBean imple
public JSONObject toJSONObject(String timeZoneId) {
JSONObject json = new JSONObject();
json.put(JsonTags.JMS_JNDI_PROPERTIES, JSONValue.toJSONString(JNDIProperties));
- json.put(JsonTags.JMS_TOPIC_NAME, topicName);
+ json.put(JsonTags.JMS_TOPIC_PATTERN, JSONValue.toJSONString(topicProperties));
+ json.put(JsonTags.JMS_TOPIC_PREFIX, topicPrefix);
return json;
}
+ @Override
+ public String getTopicPrefix() {
+ return topicPrefix;
+ }
+
+ /**
+ * Sets the topic prefix
+ * @param topicPrefix
+ */
+ public void setTopicPrefix(String topicPrefix) {
+ this.topicPrefix = topicPrefix;
+ }
+
+ /**
+ * Set the topic pattern properties
+ * @param topicProperties
+ */
+ public void setTopicPatternProperties(Properties topicProperties) {
+ this.topicProperties = topicProperties;
+ }
+
+ @Override
+ public Properties getTopicPatternProperties() {
+ return topicProperties;
+ }
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java Wed May 8 17:59:52 2013
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.jms.DefaultJMSServerInfo;
-import org.apache.oozie.jms.JMSJobEventListener;
-import org.apache.oozie.jms.JMSServerInfo;
-import org.apache.oozie.service.JMSAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Command to create and retrieve the JMS ConnectionInfo bean
- *
- */
-public class JMSInfoXCommand extends XCommand<JMSConnectionInfoBean> {
-
- private final String jobId;
- private final Configuration conf = Services.get().getConf();
-
- /**
- * Constructor for creating the JMSInfoXcommand
- * @param jobId
- */
- public JMSInfoXCommand(String jobId) {
- super("jms_info", "jms_info", 0);
- this.jobId = jobId;
- }
-
- protected JMSConnectionInfoBean execute() throws CommandException {
- String connectionProperties = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
- Class<?> defaultClazz = conf.getClass(JMSAccessorService.JMS_PRODUCER_CONNECTION_INFO_IMPL, DefaultJMSServerInfo.class);
- JMSServerInfo jmsServerInfo = null;
- try {
- if (defaultClazz == DefaultJMSServerInfo.class) {
- jmsServerInfo = new DefaultJMSServerInfo();
- }
- else {
- jmsServerInfo = (JMSServerInfo) ReflectionUtils.newInstance(defaultClazz, null);
- }
- JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, jobId);
- return jmsBean;
- }
- catch (Exception e) {
- throw new CommandException(ErrorCode.E1601, e.getMessage(), e);
- }
-
- }
-
- @Override
- protected boolean isLockRequired() {
- return false;
- }
-
- @Override
- public String getEntityKey() {
- return jobId;
- }
-
- @Override
- protected void loadState() throws CommandException {
- }
-
- @Override
- protected void verifyPrecondition() throws CommandException, PreconditionException {
- }
-
-}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java Wed May 8 17:59:52 2013
@@ -30,7 +30,7 @@ import org.apache.oozie.service.Services
public class MessageFactory {
public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json");
- public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.message.serialize.";
+ public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.jms.serialize.";
private static class MessageSerializerHolder {
private static String messageSerializerInstance = Services
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java Wed May 8 17:59:52 2013
@@ -100,7 +100,7 @@ public interface ConnectionContext {
* @return
* @throws JMSException
*/
- public ThreadLocal<Session> createThreadLocalSession(final int sessionOpts);
+ public Session createThreadLocalSession(final int sessionOpts) throws JMSException;
/**
* Closes the connection
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java Wed May 8 17:59:52 2013
@@ -38,7 +38,6 @@ public class DefaultConnectionContext im
protected Connection connection;
protected String connectionFactoryName;
private static XLog LOG = XLog.getLog(ConnectionContext.class);
- private int sessionOpts;
@Override
public void createConnection(Properties props) throws NamingException, JMSException {
@@ -58,7 +57,6 @@ public class DefaultConnectionContext im
LOG.error("Error in JMS connection", je);
}
});
-
}
catch (JMSException e1) {
LOG.error(e1.getMessage(), e1);
@@ -69,6 +67,9 @@ public class DefaultConnectionContext im
catch (Exception e2) {
LOG.error(e2.getMessage(), e2);
}
+ finally {
+ connection = null;
+ }
}
throw e1;
}
@@ -86,6 +87,9 @@ public class DefaultConnectionContext im
@Override
public Session createSession(int sessionOpts) throws JMSException {
+ if (connection == null) {
+ throw new JMSException ("Connection is not initialized");
+ }
return connection.createSession(false, sessionOpts);
}
@@ -105,32 +109,34 @@ public class DefaultConnectionContext im
@Override
public void close() {
- try {
- connection.close();
- }
- catch (JMSException e) {
- LOG.warn("Unable to close the connection " + connection, e);
- }
- }
-
- private final ThreadLocal<Session> th = new ThreadLocal<Session>() {
- protected Session initialValue() {
+ if (connection != null) {
try {
- return connection.createSession(false, sessionOpts);
+ connection.close();
}
catch (JMSException e) {
- throw new RuntimeException("Session couldn't be created", e);
+ LOG.warn("Unable to close the connection " + connection, e);
+ }
+ finally {
+ connection = null;
}
}
- };
+ th = null;
+ }
+
+ private ThreadLocal<Session> th = new ThreadLocal<Session>();
@Override
- public ThreadLocal<Session> createThreadLocalSession(final int sessionOpts) {
- this.sessionOpts = sessionOpts;
- return th;
+ public Session createThreadLocalSession(final int sessionOpts) throws JMSException {
+ Session session = th.get();
+ if (session != null) {
+ return session;
+ }
+ th.remove();
+ session = createSession(sessionOpts);
+ th.set(session);
+ return session;
}
-
@Override
public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException {
Topic topic = session.createTopic(topicName);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java Wed May 8 17:59:52 2013
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.jms;
-
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.service.JMSTopicService;
-import org.apache.oozie.service.Services;
-
-/**
- * Default implementation to retrieve the JMS ConnectionInfo bean
- */
-public class DefaultJMSServerInfo implements JMSServerInfo {
- private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
-
- @Override
- public JMSConnectionInfoBean getJMSConnectionInfoBean(String connectionProperties, String jobId) throws Exception {
- JMSConnectionInfoBean jmsBean = new JMSConnectionInfoBean();
- JMSConnectionInfo jmsInfo = new JMSConnectionInfo(connectionProperties);
- jmsBean.setJNDIProperties(jmsInfo.getJNDIProperties());
- if (jmsTopicService != null) {
- jmsBean.setTopicName(jmsTopicService.getTopic(jobId));
- }
- else {
- throw new Exception(
- "Topic name cannot be retrieved as JMSTopicService is not initialized. JMS notification"
- + "may not be enabled");
- }
- return jmsBean;
- }
-
-}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java Wed May 8 17:59:52 2013
@@ -47,9 +47,8 @@ public class JMSExceptionListener implem
public void onException(JMSException exception) {
LOG.warn("Received JMSException for [{0}]", connInfo, exception);
connCtxt.close();
- JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
- jmsService.removeConnInfo(connInfo);
if (retry) {
+ JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
jmsService.reestablishConnection(connInfo);
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java Wed May 8 17:59:52 2013
@@ -20,6 +20,7 @@ package org.apache.oozie.jms;
import java.util.Map;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -50,7 +51,7 @@ public class JMSJobEventListener extends
private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
private JMSConnectionInfo connInfo;
- public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.notification.connection";
+ public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.connection.properties";
public static final String JMS_SESSION_OPTS = "oozie.jms.producer.session.opts";
public static final String JMS_DELIVERY_MODE = "oozie.jms.delivery.mode";
public static final String JMS_EXPIRATION_DATE = "oozie.jms.expiration.date";
@@ -75,27 +76,24 @@ public class JMSJobEventListener extends
protected void sendMessage(Map<String, String> messageProperties, String messageBody, String topicName,
String messageFormat) {
- jmsContext = jmsService.createConnectionContext(connInfo, false);
- ThreadLocal<Session> threadLocal = null;
- try {
- threadLocal = jmsContext.createThreadLocalSession(jmsSessionOpts);
- Session session = threadLocal.get();
- TextMessage textMessage = session.createTextMessage(messageBody);
- for (Map.Entry<String, String> property : messageProperties.entrySet()) {
- textMessage.setStringProperty(property.getKey(), property.getValue());
+ jmsContext = jmsService.createProducerConnectionContext(connInfo);
+ if (jmsContext != null) {
+ try {
+ Session session = jmsContext.createThreadLocalSession(jmsSessionOpts);
+ TextMessage textMessage = session.createTextMessage(messageBody);
+ for (Map.Entry<String, String> property : messageProperties.entrySet()) {
+ textMessage.setStringProperty(property.getKey(), property.getValue());
+ }
+ textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
+ LOG.trace("Event related JMS message [{0}]", textMessage.toString());
+ MessageProducer producer = jmsContext.createProducer(session, topicName);
+ producer.setDeliveryMode(jmsDeliveryMode);
+ producer.setTimeToLive(jmsExpirationDate);
+ producer.send(textMessage);
+ producer.close();
}
- textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
- LOG.trace("Event related JMS message [{0}]", textMessage.getText());
- MessageProducer producer = jmsContext.createProducer(session, topicName);
- producer.setDeliveryMode(jmsDeliveryMode);
- producer.setTimeToLive(jmsExpirationDate);
- producer.send(textMessage);
- producer.close();
- }
- catch (Exception jmse) {
- LOG.error("Exception happened while sending event related jms message", jmse);
- if (threadLocal != null) {
- threadLocal.remove();
+ catch (JMSException jmse) {
+ LOG.error("Exception happened while sending event related jms message", jmse);
}
}
@@ -104,17 +102,17 @@ public class JMSJobEventListener extends
@Override
public void onWorkflowJobEvent(WorkflowJobEvent event) {
WorkflowJobMessage wfJobMessage = MessageFactory.createWorkflowJobMessage(event);
- serializeAndSendJMSMessage(wfJobMessage, getTopic(event));
+ serializeJMSMessage(wfJobMessage, getTopic(event));
}
@Override
public void onCoordinatorActionEvent(CoordinatorActionEvent event) {
CoordinatorActionMessage coordActionMessage = MessageFactory.createCoordinatorActionMessage(event);
- serializeAndSendJMSMessage(coordActionMessage, getTopic(event));
+ serializeJMSMessage(coordActionMessage, getTopic(event));
}
- private void serializeAndSendJMSMessage(JobMessage jobMessage, String topicName) {
+ private void serializeJMSMessage(JobMessage jobMessage, String topicName) {
MessageSerializer serializer = MessageFactory.getMessageSerializer();
String messageBody = serializer.getSerializedObject(jobMessage);
sendMessage(jobMessage.getMessageProperties(), messageBody, topicName, serializer.getMessageFormat());
@@ -140,27 +138,18 @@ public class JMSJobEventListener extends
@Override
public void onWorkflowActionEvent(WorkflowActionEvent wae) {
- LOG.warn("Sending jms message related to workflow action is not supported");
}
@Override
public void onCoordinatorJobEvent(CoordinatorJobEvent wje) {
- LOG.warn("Sending jms message related to coordinator job is not supported");
}
@Override
public void onBundleJobEvent(BundleJobEvent wje) {
- LOG.warn("Sending jms message related to bundle job is not supported");
}
@Override
public void destroy() {
- if (jmsContext != null) {
- LOG.debug("Closing JMS connection");
- jmsContext.close();
- jmsService.removeConnInfo(connInfo);
- }
-
- }
+}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java Wed May 8 17:59:52 2013
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.jms;
-
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-
-/**
- * Get the JMS ConnectionInfoBean
- */
-public interface JMSServerInfo {
-
- /**
- * Retrive the conn info bean using conn properties and job id
- * @param connectionProperties the jms producer conn properties
- * @param jobId the job id
- * @return
- * @throws Exception
- */
- public JMSConnectionInfoBean getJMSConnectionInfoBean(String connectionProperties, String jobId) throws Exception;
-
-}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Wed May 8 17:59:52 2013
@@ -54,7 +54,6 @@ public class JMSAccessorService implemen
public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
- public static final String JMS_PRODUCER_CONNECTION_INFO_IMPL = CONF_PREFIX + "jms.producer.connection.info.impl";
private static XLog LOG;
private Configuration conf;
@@ -62,6 +61,7 @@ public class JMSAccessorService implemen
private int retryInitialDelay;
private int retryMultiplier;
private int retryMaxAttempts;
+ private ConnectionContext jmsProducerConnContext;
/**
* Map of JMS connection info to established JMS Connection
@@ -106,7 +106,7 @@ public class JMSAccessorService implemen
if (!topicsMap.containsKey(topic)) {
synchronized (topicsMap) {
if (!topicsMap.containsKey(topic)) {
- ConnectionContext connCtxt = createConnectionContext(connInfo, true);
+ ConnectionContext connCtxt = createConnectionContext(connInfo);
if (connCtxt == null) {
queueTopicForRetry(connInfo, topic, msgHandler);
return;
@@ -255,13 +255,13 @@ public class JMSAccessorService implemen
}
}
- public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo, boolean retry) {
+ public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
ConnectionContext connCtxt = connectionMap.get(connInfo);
if (connCtxt == null) {
try {
connCtxt = getConnectionContextImpl();
connCtxt.createConnection(connInfo.getJNDIProperties());
- connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, retry));
+ connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true));
connectionMap.put(connInfo, connCtxt);
LOG.info("Connection established to JMS Server for [{0}]", connInfo);
}
@@ -273,6 +273,30 @@ public class JMSAccessorService implemen
return connCtxt;
}
+ public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) {
+ if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) {
+ return jmsProducerConnContext;
+ }
+ else {
+ synchronized (this) {
+ if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
+ try {
+ jmsProducerConnContext = getConnectionContextImpl();
+ jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
+ jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
+ jmsProducerConnContext, false));
+ LOG.info("Connection established to JMS Server for [{0}]", connInfo);
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
+ return null;
+ }
+ }
+ }
+ }
+ return jmsProducerConnContext;
+ }
+
private ConnectionContext getConnectionContextImpl() {
Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class);
ConnectionContext connCtx = null;
@@ -297,23 +321,15 @@ public class JMSAccessorService implemen
@Override
public void destroy() {
LOG.info("Destroying JMSAccessor service ");
- LOG.info("Closing JMS sessions");
- for (Map<String, MessageReceiver> topicsMap : receiversMap.values()) {
- for (MessageReceiver receiver : topicsMap.values()) {
- try {
- receiver.getSession().close();
- }
- catch (JMSException e) {
- LOG.warn("Unable to close session " + receiver.getSession(), e);
- }
- }
- }
receiversMap.clear();
LOG.info("Closing JMS connections");
for (ConnectionContext conn : connectionMap.values()) {
conn.close();
}
+ if (jmsProducerConnContext != null) {
+ jmsProducerConnContext.close();
+ }
connectionMap.clear();
}
@@ -322,16 +338,13 @@ public class JMSAccessorService implemen
return JMSAccessorService.class;
}
- public void removeConnInfo(JMSConnectionInfo connInfo){
- connectionMap.remove(connInfo);
- }
-
/**
* Reestablish connection for the given JMS connect information
* @param connInfo JMS connection info
*/
public void reestablishConnection(JMSConnectionInfo connInfo) {
// Queue the connection and topics for retry
+ connectionMap.remove(connInfo);
ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
if (listeningTopicsMap != null) {
@@ -339,12 +352,6 @@ public class JMSAccessorService implemen
for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
MessageReceiver receiver = topicEntry.getValue();
retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
- try {
- receiver.getSession().close();
- }
- catch (JMSException e) {
- LOG.warn("Unable to close session " + receiver.getSession(), e);
- }
}
}
}
@@ -366,7 +373,7 @@ public class JMSAccessorService implemen
LOG.info("Attempting retry of connection [{0}]", connInfo);
connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
- ConnectionContext connCtxt = createConnectionContext(connInfo, true);
+ ConnectionContext connCtxt = createConnectionContext(connInfo);
boolean shouldRetry = false;
if (connCtxt == null) {
shouldRetry = true;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java Wed May 8 17:59:52 2013
@@ -21,9 +21,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.Event.AppType;
import org.apache.oozie.event.CoordinatorActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
@@ -32,6 +35,7 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
import org.apache.oozie.util.XLog;
+
/**
* JMS Topic service to retrieve topic names from events or job id
*
@@ -252,6 +256,30 @@ public class JMSTopicService implements
return topicName;
}
+ public Properties getTopicPatternProperties() {
+ Properties props = new Properties();
+ String wfTopic = topicMap.get(JobType.WORKFLOW.value);
+ wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
+ props.put(AppType.WORKFLOW_JOB, wfTopic);
+ props.put(AppType.WORKFLOW_ACTION, wfTopic);
+
+ String coordTopic = topicMap.get(JobType.COORDINATOR.value);
+ coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
+ props.put(AppType.COORDINATOR_JOB, coordTopic);
+ props.put(AppType.COORDINATOR_ACTION, coordTopic);
+
+ String bundleTopic = topicMap.get(JobType.BUNDLE.value);
+ bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
+ props.put(AppType.BUNDLE_JOB, bundleTopic);
+ props.put(AppType.BUNDLE_ACTION, bundleTopic);
+
+ return props;
+ }
+
+ public String getTopicPrefix() {
+ return topicPrefix;
+ }
+
@Override
public void destroy() {
topicMap.clear();
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java Wed May 8 17:59:52 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletReq
import javax.servlet.http.HttpServletResponse;
import org.apache.oozie.BuildInfo;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.service.AuthorizationException;
@@ -74,6 +75,18 @@ public abstract class BaseAdminServlet e
}*/
}
+
+ /**
+ * Get JMS connection Info
+ * @param request
+ * @param response
+ * @throws XServletException
+ * @throws IOException
+ */
+ abstract JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
+ throws XServletException, IOException;
+
+
/**
* Return safemode state, instrumentation, configuration, osEnv or
* javaSysProps
@@ -123,8 +136,16 @@ public abstract class BaseAdminServlet e
json.put(JsonTags.AVAILABLE_TIME_ZONES, availableTimeZonesToJsonArray());
sendJsonResponse(response, HttpServletResponse.SC_OK, json);
}
+ else if (resource.equals(RestConstants.ADMIN_JMS_INFO)) {
+ String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null ? "GMT" : request
+ .getParameter(RestConstants.TIME_ZONE_PARAM);
+ JsonBean jmsBean = getJMSConnectionInfo(request, response);
+ sendJsonResponse(response, HttpServletResponse.SC_OK, jmsBean, timeZoneId);
+ }
+
}
+
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
IOException {
@@ -182,7 +203,7 @@ public abstract class BaseAdminServlet e
throws XServletException;
protected abstract void getQueueDump(JSONObject json) throws XServletException;
-
+
private static final JSONArray GMTOffsetTimeZones = new JSONArray();
static {
prepareGMTOffsetTimeZones();
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java Wed May 8 17:59:52 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.service.AuthorizationException;
import org.apache.oozie.service.AuthorizationService;
@@ -236,11 +237,13 @@ public abstract class BaseJobServlet ext
sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
}
- else if (show.equals(RestConstants.JOB_SHOW_JMS_INFO)){
+ else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) {
stopCron();
- JsonBean jmsBean = getJMSConnectionInfo (request, response);
+ String jmsTopicName = getJMSTopicName(request, response);
+ JSONObject json = new JSONObject();
+ json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName);
startCron();
- sendJsonResponse(response, HttpServletResponse.SC_OK, jmsBean, timeZoneId);
+ sendJsonResponse(response, HttpServletResponse.SC_OK, json);
}
else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
@@ -380,14 +383,13 @@ public abstract class BaseJobServlet ext
abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
throws XServletException, IOException;
-
/**
- * abstract method to get JMS connection details for a job
+ * abstract method to get JMS topic name for a job
* @param request
* @param response
* @throws XServletException
* @throws IOException
*/
- abstract JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
+ abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response)
throws XServletException, IOException;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java Wed May 8 17:59:52 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.servlet;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -25,11 +26,13 @@ import javax.servlet.http.HttpServletRes
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.service.Services;
import org.json.simple.JSONObject;
+@SuppressWarnings("unchecked")
public class V0AdminServlet extends BaseAdminServlet {
private static final long serialVersionUID = 1L;
private static final String INSTRUMENTATION_NAME = "v0admin";
@@ -109,4 +112,10 @@ public class V0AdminServlet extends Base
protected void getQueueDump(JSONObject json) throws XServletException {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0301, "cannot get queue dump");
}
+
+ @Override
+ protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0");
+ }
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java Wed May 8 17:59:52 2013
@@ -211,7 +211,8 @@ public class V0JobServlet extends BaseJo
}
@Override
- protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
- throws XServletException, IOException {
- throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);}
+ protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
+ }
}
\ No newline at end of file
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java Wed May 8 17:59:52 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,18 +17,26 @@
*/
package org.apache.oozie.servlet;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.service.JMSTopicService;
import org.apache.oozie.service.Services;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -37,7 +45,7 @@ public class V1AdminServlet extends Base
private static final long serialVersionUID = 1L;
private static final String INSTRUMENTATION_NAME = "v1admin";
- private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[8];
+ private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[9];
static {
RESOURCES_INFO[0] = new ResourceInfo(RestConstants.ADMIN_STATUS_RESOURCE, Arrays.asList("PUT", "GET"),
@@ -57,13 +65,20 @@ public class V1AdminServlet extends Base
Collections.EMPTY_LIST);
RESOURCES_INFO[7] = new ResourceInfo((RestConstants.ADMIN_TIME_ZONES_RESOURCE), Arrays.asList("GET"),
Collections.EMPTY_LIST);
+ RESOURCES_INFO[8] = new ResourceInfo((RestConstants.ADMIN_JMS_INFO), Arrays.asList("GET"),
+ Collections.EMPTY_LIST);
+
}
- public V1AdminServlet() {
- super(INSTRUMENTATION_NAME, RESOURCES_INFO);
+ protected V1AdminServlet(String name) {
+ super(name, RESOURCES_INFO);
modeTag = RestConstants.ADMIN_SYSTEM_MODE_PARAM;
}
+ public V1AdminServlet() {
+ this(INSTRUMENTATION_NAME);
+ }
+
/*
* (non-Javadoc)
*
@@ -131,4 +146,11 @@ public class V1AdminServlet extends Base
json.put(JsonTags.UNIQUE_MAP_DUMP, uniqueDumpArray);
}
+ @Override
+ protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+ }
+
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java?rev=1480380&r1=1480379&r2=1480380&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java Wed May 8 17:59:52 2013
@@ -737,20 +737,6 @@ public class V1JobServlet extends BaseJo
return consoleBase;
}
- protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response) throws XServletException{
- JsonBean jmsBean = null;
- String jobId = getResourceName(request);
- DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
- getAuthToken(request));
- try {
- jmsBean = (JsonBean) dagEngine.getJMSConnectionInfo(jobId);
- }
- catch (DagEngineException ex) {
- throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
- }
- return jmsBean;
- }
-
/**
* Get wf action info
*
@@ -1009,4 +995,11 @@ public class V1JobServlet extends BaseJo
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
}
}
+
+ @Override
+ protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
+ IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
+ }
+
}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java?rev=1480380&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V2AdminServlet.java Wed May 8 17:59:52 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.servlet;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
+import org.apache.oozie.service.JMSTopicService;
+import org.apache.oozie.service.Services;
+
+/**
+ * V2 admin servlet
+ *
+ */
+public class V2AdminServlet extends V1AdminServlet {
+
+ private static final long serialVersionUID = 1L;
+ private static final String INSTRUMENTATION_NAME = "v2admin";
+
+ public V2AdminServlet() {
+ super(INSTRUMENTATION_NAME);
+ }
+
+ @Override
+ protected JsonBean getJMSConnectionInfo(HttpServletRequest request, HttpServletResponse response)
+ throws XServletException, IOException {
+ Configuration conf = Services.get().getConf();
+ JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
+ String connectionProperties = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+ if (connectionProperties == null) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E1601,
+ "JMS connection property is not defined");
+ }
+ JMSConnectionInfoBean jmsBean = new JMSConnectionInfoBean();
+ JMSConnectionInfo jmsInfo = new JMSConnectionInfo(connectionProperties);
+ Properties jmsInfoProps = jmsInfo.getJNDIProperties();
+ jmsInfoProps.remove("java.naming.security.principal");
+ jmsBean.setJNDIProperties(jmsInfoProps);
+ if (jmsTopicService != null) {
+ jmsBean.setTopicPrefix(jmsTopicService.getTopicPrefix());
+ jmsBean.setTopicPatternProperties(jmsTopicService.getTopicPatternProperties());
+ }
+ else {
+ throw new XServletException(
+ HttpServletResponse.SC_BAD_REQUEST,
+ ErrorCode.E1601,
+ "JMSTopicService is not initialized. JMS notification"
+ + "may not be enabled");
+ }
+ return jmsBean;
+ }
+
+}