You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/04/18 01:55:21 UTC
svn commit: r1469108 [1/3] - in /oozie/trunk: ./ client/
client/src/main/java/org/apache/oozie/cli/
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/event/jms/
client/src/main/java/org/apache/oozie/client/event...
Author: virag
Date: Wed Apr 17 23:55:20 2013
New Revision: 1469108
URL: http://svn.apache.org/r1469108
Log:
Combined patch for OOZIE-1234 and OOZIE-1235
Added:
oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java
oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/
oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForUserJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultJMSServerInfo.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSServerInfo.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/TestJMSInfoXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestDefaultConnectionContext.java
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestJMSServerInfo.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
Modified:
oozie/trunk/client/pom.xml
oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
oozie/trunk/core/pom.xml
oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java
oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestV1JobServlet.java
oozie/trunk/pom.xml
oozie/trunk/release-log.txt
Modified: oozie/trunk/client/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/client/pom.xml?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/pom.xml (original)
+++ oozie/trunk/client/pom.xml Wed Apr 17 23:55:20 2013
@@ -49,6 +49,24 @@
</dependency>
<dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>compile</scope>
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Wed Apr 17 23:55:20 2013
@@ -56,6 +56,7 @@ import org.apache.oozie.client.BulkRespo
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.JMSConnectionInfo;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowAction;
@@ -110,6 +111,7 @@ public class OozieCLI {
public static final String ACTION_OPTION = "action";
public static final String DEFINITION_OPTION = "definition";
public static final String CONFIG_CONTENT_OPTION = "configcontent";
+ public static final String JMS_INFO_OPTION = "jmsinfo";
public static final String DO_AS_OPTION = "doas";
@@ -253,6 +255,7 @@ public class OozieCLI {
Option changeValue = new Option(CHANGE_VALUE_OPTION, true,
"new endtime/concurrency/pausetime value for changing a coordinator job");
Option info = new Option(INFO_OPTION, true, "info of a job");
+ Option jmsInfo = new Option (JMS_INFO_OPTION, true, "JMS Topic name and JNDI connection string for a job");
Option offset = new Option(OFFSET_OPTION, true, "job info offset of actions (default '1', requires -info)");
Option len = new Option(LEN_OPTION, true, "number of actions (default TOTAL ACTIONS, requires -info)");
Option filter = new Option(FILTER_OPTION, true,
@@ -289,6 +292,7 @@ public class OozieCLI {
actions.addOption(kill);
actions.addOption(change);
actions.addOption(info);
+ actions.addOption(jmsInfo);
actions.addOption(rerun);
actions.addOption(log);
actions.addOption(definition);
@@ -747,6 +751,9 @@ public class OozieCLI {
}
try {
+ if (options.contains(JMS_INFO_OPTION)) {
+ printJMSInfo(wc.getJMSConnectionInfo(commandLine.getOptionValue(JMS_INFO_OPTION)));
+ }
if (options.contains(SUBMIT_OPTION)) {
System.out.println(JOB_ID_PREFIX + wc.submit(getConfiguration(wc, commandLine)));
}
@@ -1077,6 +1084,11 @@ public class OozieCLI {
}
}
+ private void printJMSInfo (JMSConnectionInfo jmsInfo) {
+ System.out.println("Topic Name : " + jmsInfo.getTopicName());
+ System.out.println("JNDI Properties : " + jmsInfo.getJNDIProperties());
+ }
+
private void printWorkflowAction(WorkflowAction action, String timeZoneId, boolean verbose) {
System.out.println("ID : " + maskIfNull(action.getId()));
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/JMSConnectionInfo.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client;
+
+import java.util.Properties;
+
+/**
+ * JMS connection related information
+ *
+ */
+public interface JMSConnectionInfo {
+
+ /**
+ * Retrieve the JMS topic name
+ * @return the topic name
+ */
+ String getTopicName();
+
+ /**
+ * Retrieve the JNDI properties for establishing connection to JMS server
+ * @return the JNDI properties
+ */
+ Properties getJNDIProperties();
+
+}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed Apr 17 23:55:20 2013
@@ -667,6 +667,27 @@ public class OozieClient {
}
}
+
+ private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
+
+ JMSInfo(String jobId) {
+ super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
+ RestConstants.JOB_SHOW_JMS_INFO));
+ }
+
+ protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
+ if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+ Reader reader = new InputStreamReader(conn.getInputStream());
+ JSONObject json = (JSONObject) JSONValue.parse(reader);
+ return JsonToBean.createJMSConnectionInfo(json);
+ }
+ else {
+ handleError(conn);
+ }
+ return null;
+ }
+ }
+
private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
WorkflowActionInfo(String actionId) {
super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
@@ -699,6 +720,16 @@ public class OozieClient {
}
/**
+ * Get the JMS Connection info
+ * @param jobId
+ * @return JMSConnectionInfo object
+ * @throws OozieClientException
+ */
+ public JMSConnectionInfo getJMSConnectionInfo(String jobId) throws OozieClientException {
+ return new JMSInfo(jobId).call();
+ }
+
+ /**
* Get the info of a workflow job and subset actions.
*
* @param jobId job Id.
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSHeaderConstants.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+/**
+ *
+ * Class holding constants used in JMS selectors
+ */
+public final class JMSHeaderConstants {
+ // JMS Application specific properties for selectors
+ public static final String EVENT_STATUS = "EVENT_STATUS";
+ public static final String APP_NAME = "APP_NAME";
+ public static final String USER = "USER";
+ public static final String MESSAGE_TYPE = "MESSAGE_TYPE";
+ public static final String APP_TYPE = "APP_TYPE";
+ // JMS Header property
+ public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JMSMessagingUtils.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.oozie.client.event.message.EventMessage;
+
+/**
+ * Client utility to convert JMS message to EventMessage object
+ */
+public class JMSMessagingUtils {
+
+ private static final String DESERIALIZER_PROP = "oozie.msg.deserializer.";
+
+ /**
+ * Constructs the EventMessage object from JMS message
+ *
+ * @param msg the JMS message
+ * @return the EventMessage
+ * @throws IOException
+ * @throws JMSException
+ */
+ public static EventMessage getEventMessage(Message msg) throws IOException, JMSException {
+ if (msg == null) {
+ throw new IllegalArgumentException("Could not extract EventMessage as JMS message is null");
+ }
+ TextMessage textMessage = (TextMessage) msg;
+ String msgFormat = msg.getStringProperty(JMSHeaderConstants.MESSAGE_FORMAT);
+
+ Properties jmsDeserializerInfo = new Properties();
+ InputStream is = JMSMessagingUtils.class.getClassLoader().getResourceAsStream("oozie_client.properties");
+
+ if (is == null) {
+ System.out.println("Using default deserializer");
+ return new JSONMessageDeserializer().getEventMessage(textMessage);
+ }
+ else {
+ jmsDeserializerInfo.load(is);
+ MessageDeserializer deserializer = getDeserializer((String) jmsDeserializerInfo.get(DESERIALIZER_PROP
+ + msgFormat));
+ return deserializer.getEventMessage(textMessage);
+ }
+ }
+
+ private static MessageDeserializer getDeserializer(String deserializerString) {
+ if (deserializerString == null) {
+ return new JSONMessageDeserializer();
+ }
+ else {
+ try {
+ MessageDeserializer msgDeserializer = (MessageDeserializer) Class.forName(deserializerString)
+ .newInstance();
+ return msgDeserializer;
+ }
+ catch (Exception cnfe) {
+ throw new IllegalArgumentException("Could not access class " + deserializerString, cnfe);
+ }
+
+ }
+ }
+}
\ No newline at end of file
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/JSONMessageDeserializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+
+/**
+ * Message deserializer to convert from JSON to java object
+ */
+public class JSONMessageDeserializer extends MessageDeserializer {
+
+ @Override
+ public <T> T getDeserializedObject(String messageBody, Class<T> clazz) {
+ try {
+ return mapper.readValue(messageBody, clazz);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not deserialize the JMS message using "
+ + clazz.getCanonicalName(), exception);
+ }
+ }
+
+ @Override
+ public WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage workflowJobMsg, Message message)
+ throws JMSException {
+ workflowJobMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
+ workflowJobMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));
+ workflowJobMsg.setEventStatus(EventStatus.valueOf(message.getStringProperty(JMSHeaderConstants.EVENT_STATUS)));
+ workflowJobMsg.setAppName(message.getStringProperty(JMSHeaderConstants.APP_NAME));
+ workflowJobMsg.setUser(message.getStringProperty(JMSHeaderConstants.USER));
+ return workflowJobMsg;
+
+ }
+
+ @Override
+ public CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMsg, Message message)
+ throws JMSException {
+ coordActionMsg.setAppType(AppType.valueOf(message.getStringProperty(JMSHeaderConstants.APP_TYPE)));
+ coordActionMsg.setMessageType(MessageType.valueOf(message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)));
+ coordActionMsg.setEventStatus(EventStatus.valueOf(message.getStringProperty(JMSHeaderConstants.EVENT_STATUS)));
+ coordActionMsg.setAppName(message.getStringProperty(JMSHeaderConstants.APP_NAME));
+ coordActionMsg.setUser(message.getStringProperty(JMSHeaderConstants.USER));
+ return coordActionMsg;
+ }
+
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/jms/MessageDeserializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.jms;
+
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.EventMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.apache.oozie.client.event.Event;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.DeserializationConfig;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+
+/**
+ * Class to deserialize the jms message to java object
+ */
+public abstract class MessageDeserializer {
+ static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+ static {
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ /**
+ * Constructs the event message from JMS message
+ *
+ * @param message the JMS message
+ * @return EventMessage
+ * @throws JMSException
+ */
+ public EventMessage getEventMessage(Message message) throws JMSException {
+ String appTypeString = message.getStringProperty(JMSHeaderConstants.APP_TYPE);
+ String messageBody = ((TextMessage) message).getText();
+
+ if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
+ throw new IllegalArgumentException("Could not extract OozieEventMessage. "
+ + "AppType and/or MessageBody is null/empty." + "Apptype is " + appTypeString + " MessageBody is "
+ + messageBody);
+ }
+ switch (Event.AppType.valueOf(appTypeString)) {
+ case WORKFLOW_JOB:
+ WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
+ return setPropertiesForObject(wfJobMsg, message);
+ case COORDINATOR_ACTION:
+ CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
+ CoordinatorActionMessage.class);
+ return setPropertiesForObject(caActionMsg, message);
+ default:
+ throw new UnsupportedOperationException("Conversion of " + appTypeString
+ + " to Event message is not supported");
+ }
+
+ }
+
+ protected abstract <T> T getDeserializedObject(String s, Class<T> clazz);
+
+ /**
+ * Set the JMS selector properties for the workflow job message object
+ *
+ * @param wfJobMessage the workflow job message
+ * @param message the JMS message
+ * @return WorkflowJobMessage
+ * @throws JMSException
+ */
+ public abstract WorkflowJobMessage setPropertiesForObject(WorkflowJobMessage wfJobMessage, Message message)
+ throws JMSException;
+
+ /**
+ * Set the JMS selector properties for coordinator action message
+ *
+ * @param coordActionMessage the coordinator action message
+ * @param message the JMS message
+ * @return CoordinatorActionMessage
+ * @throws JMSException
+ */
+ public abstract CoordinatorActionMessage setPropertiesForObject(CoordinatorActionMessage coordActionMessage,
+ Message message) throws JMSException;
+
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/CoordinatorActionMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import java.util.Date;
+
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.client.event.Event.AppType;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * Class holding attributes related to Coordinator action message
+ *
+ */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class CoordinatorActionMessage extends JobMessage {
+
+ @JsonProperty
+ private CoordinatorAction.Status status;
+ @JsonProperty
+ private Date nominalTime;
+ @JsonProperty
+ private String missingDependency;
+ @JsonProperty
+ private String errorCode;
+ @JsonProperty
+ private String errorMessage;
+
+ /**
+ * Default constructor
+ */
+ public CoordinatorActionMessage() {
+ // Default constructor for jackson
+ }
+
+ /**
+ * Constructs the coordinator action message
+ * @param eventStatus the event status
+ * @param coordinatorActionId the coord action id
+ * @param coordinatorJobId the parent job id
+ * @param startTime the created time of coord action
+ * @param endTime the end time of coord action
+ * @param nominalTime the nominal time of coord action
+ * @param status the status of coord action
+ * @param user the user of coordinator
+ * @param appName the app name of coordinator
+ * @param missingDependency the action's first missing dependency
+ * @param errorCode the action's error code
+ * @param errorMessage the action's error message
+ */
+ public CoordinatorActionMessage(EventStatus eventStatus, String coordinatorActionId,
+ String coordinatorJobId, Date startTime, Date endTime, Date nominalTime, CoordinatorAction.Status status,
+ String user, String appName, String missingDependency, String errorCode, String errorMessage) {
+ super(eventStatus, AppType.COORDINATOR_ACTION, coordinatorActionId, coordinatorJobId, startTime,
+ endTime, user, appName);
+ this.status = status;
+ this.nominalTime = nominalTime;
+ this.missingDependency = missingDependency;
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
+
+ }
+
+ /**
+ * Set the status of coordinator action
+ * @param status
+ */
+ public void setStatus(CoordinatorAction.Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Get the status of coord action
+ * @return the CoordinatorAction status
+ */
+ public CoordinatorAction.Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Set the nominal time
+ * @param nominalTime
+ */
+ public void setNominalTime(Date nominalTime) {
+ this.nominalTime = nominalTime;
+ }
+
+ /**
+ * Get the nominal time
+ * @return the nominal time
+ */
+ public Date getNominalTime() {
+ return nominalTime;
+ }
+
+ /**
+ * Set the error code
+ * @param errorCode
+ */
+ public void setErrorCode(String errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Get the error code
+ * @return the errorCode
+ */
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ /**
+ * Set the error message
+ * @param errorMessage
+ */
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ /**
+ * Get the error message
+ * @return the errorMessage
+ */
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ /**
+ * Set the missing dependency
+ * @param missingDependency
+ */
+ public void setMissingDependency(String missingDependency) {
+ this.missingDependency = missingDependency;
+ }
+
+ /**
+ * Get the missing dependency
+ * @return the missing Dependency
+ */
+ public String getMissingDependency() {
+ return missingDependency;
+ }
+
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/EventMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.Event.AppType;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * Base class which holds attributes for event message
+ *
+ */
+public abstract class EventMessage {
+
+ private Event.AppType appType;
+ private Event.MessageType messageType;
+
+ /**
+ * Default constructor for constructing event
+ */
+ public EventMessage() {
+ //Required for jackson
+ }
+
+ /**
+ * Constructs the event message using message type and app type
+ * @param messageType the message type
+ * @param appType the app type
+ */
+ protected EventMessage(MessageType messageType, AppType appType) {
+ this.messageType = messageType;
+ this.appType = appType;
+ }
+
+ /**
+ * Sets the appType for a event
+ * @param appType
+ */
+ public void setAppType(AppType appType) {
+ this.appType = appType;
+ }
+
+ /**
+ * Returns the appType for a event
+ * @return the AppType
+ */
+ @JsonIgnore
+ public AppType getAppType() {
+ return appType;
+ }
+
+ /**
+ * Sets the message type for a event
+ * @param messageType
+ */
+ public void setMessageType(MessageType messageType) {
+ this.messageType = messageType;
+ }
+
+ /**
+ * Returns the message type for a event
+ * @return the MessageType
+ */
+ @JsonIgnore
+ public MessageType getMessageType() {
+ return messageType;
+ }
+
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/JobMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.client.event.jms.JMSHeaderConstants;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+/**
+ * Class holding attributes related to a job message
+ *
+ */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class JobMessage extends EventMessage {
+
+ @JsonProperty
+ private String id;
+ @JsonProperty
+ private String parentId;
+ @JsonProperty
+ private Date startTime;
+ @JsonProperty
+ private Date endTime;
+
+ private JobEvent.EventStatus eventStatus;
+ private String appName;
+ private Map<String, String> jmsMessageProperties;
+ private String user;
+
+ /**
+ * Default constructor
+ */
+ public JobMessage() {
+ // Default constructor for jackson
+ }
+
+ /**
+ * Constructs a job message
+ *
+ * @param eventStatus the event status
+ * @param appType the appType for event
+ * @param id the id of job
+ * @param parentId the parent id of job
+ * @param startTime the start time of job
+ * @param endTime the end time of job
+ * @param user user of the job
+ * @param appName appName for job
+ */
+ public JobMessage(JobEvent.EventStatus eventStatus, AppType appType, String id, String parentId, Date startTime,
+ Date endTime, String user, String appName) {
+ super(MessageType.JOB, appType);
+ this.eventStatus = eventStatus;
+ this.id = id;
+ this.parentId = parentId;
+ this.startTime = startTime;
+ this.appName = appName;
+ this.user = user;
+ this.endTime = endTime;
+
+ jmsMessageProperties = new HashMap<String, String>();
+ jmsMessageProperties.put(JMSHeaderConstants.APP_TYPE, appType.toString());
+ jmsMessageProperties.put(JMSHeaderConstants.MESSAGE_TYPE, MessageType.JOB.toString());
+ jmsMessageProperties.put(JMSHeaderConstants.EVENT_STATUS, eventStatus.toString());
+ jmsMessageProperties.put(JMSHeaderConstants.APP_NAME, appName);
+ jmsMessageProperties.put(JMSHeaderConstants.USER, user);
+ }
+
+ /**
+ * Sets the Job id for message
+ *
+ * @param id the job id
+ */
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Gets the job id
+ *
+ * @return the Job id
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the parent job id for message
+ *
+ * @param parentId the parent job id
+ */
+ public void setParentId(String parentId) {
+ this.parentId = parentId;
+ }
+
+ /**
+ * Gets the parent job id
+ *
+ * @return the parentId
+ */
+ public String getParentId() {
+ return parentId;
+ }
+
+ /**
+ * Sets the job start time for message
+ *
+ * @param startTime
+ */
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * Gets the job start time
+ *
+ * @return the start time
+ */
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Sets the job end time for message
+ *
+ * @param endTime
+ */
+ public void setEndTime(Date endTime) {
+ this.endTime = endTime;
+ }
+
+ /**
+ * Gets the job end time
+ *
+ * @return the end time
+ */
+ public Date getEndTime() {
+ return endTime;
+ }
+
+ /**
+ * Sets the job's app name for message
+ *
+ * @param appName
+ */
+ public void setAppName(String appName) {
+ this.appName = appName;
+ }
+
+ /**
+ * Gets the job's app name
+ *
+ * @return the app name
+ */
+ @JsonIgnore
+ public String getAppName() {
+ return appName;
+ }
+
+ /**
+ * Sets the JMS selectors for message
+ *
+ * @param properties the jms selector key value pair
+ */
+ public void setMessageProperties(Map<String, String> properties) {
+ jmsMessageProperties = properties;
+ }
+
+ /**
+ * Gets the message properties
+ *
+ * @return the message properties
+ */
+ @JsonIgnore
+ public Map<String, String> getMessageProperties() {
+ return jmsMessageProperties;
+ }
+
+ /**
+ * sets the job user for the msg
+ *
+ * @param user
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * Gets the job user
+ *
+ * @return the user
+ */
+ @JsonIgnore
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * Sets the event status
+ *
+ * @param eventStatus
+ */
+ public void setEventStatus(JobEvent.EventStatus eventStatus) {
+ this.eventStatus = eventStatus;
+ }
+
+ /**
+ * Gets the event status
+ *
+ * @return the event status
+ */
+ @JsonIgnore
+ public JobEvent.EventStatus getEventStatus() {
+ return eventStatus;
+ }
+
+}
Added: oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java (added)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/event/message/WorkflowJobMessage.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.event.message;
+
+import java.util.Date;
+
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * Class holding attributes related to a workflow job message
+ *
+ */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class WorkflowJobMessage extends JobMessage {
+
+ @JsonProperty
+ private WorkflowJob.Status status;
+ @JsonProperty
+ private String errorCode;
+ @JsonProperty
+ private String errorMessage;
+
+ /**
+ * Default constructor
+ */
+ public WorkflowJobMessage() {
+ // Default constructor for jackson
+ }
+
+ /**
+ * Constructor for a workflow job message
+ * @param eventStatus event status
+ * @param workflowJobId the workflow job id
+ * @param coordinatorActionId the parent coordinator action id
+ * @param startTime start time of workflow
+ * @param endTime end time of workflow
+ * @param status status of workflow
+ * @param user the user
+ * @param appName appName of workflow
+ * @param errorCode errorCode of the failed wf actions
+ * @param errorMessage errorMessage of the failed wf action
+ */
+ public WorkflowJobMessage(EventStatus eventStatus, String workflowJobId,
+ String coordinatorActionId, Date startTime, Date endTime, WorkflowJob.Status status, String user,
+ String appName, String errorCode, String errorMessage) {
+ super(eventStatus, Event.AppType.WORKFLOW_JOB, workflowJobId, coordinatorActionId, startTime,
+ endTime, user, appName);
+ this.status = status;
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
+ }
+
+ /**
+ * Set the workflow job status
+ * @param status
+ */
+ public void setStatus(WorkflowJob.Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Get the workflow job status
+ * @return the workflow status
+ */
+ public WorkflowJob.Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Set the workflow error code
+ * @param errorCode
+ */
+ public void setErrorCode(String errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Get the workflow error code
+ * @return the error code
+ */
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ /**
+ * Set the workflow error message
+ * @param errorMessage
+ */
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ /**
+ * Get the error message
+ * @return the error message
+ */
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -196,4 +196,7 @@ public interface JsonTags {
public static final String TIME_ZOME_DISPLAY_NAME = "timezoneDisplayName";
public static final String TIME_ZONE_ID = "timezoneId";
+ public static final String JMS_TOPIC_NAME = "jmsTopic";
+ public static final String JMS_JNDI_PROPERTIES = "jmsJNDIProps";
+
}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,10 +21,12 @@ import org.apache.oozie.client.BulkRespo
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.JMSConnectionInfo;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@@ -34,6 +36,9 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
/**
* JSON to bean converter for {@link WorkflowAction}, {@link WorkflowJob}, {@link CoordinatorAction}
@@ -65,6 +70,7 @@ public class JsonToBean {
private static final Map<String, Property> COORD_ACTION = new HashMap<String, Property>();
private static final Map<String, Property> BUNDLE_JOB = new HashMap<String, Property>();
private static final Map<String, Property> BULK_RESPONSE = new HashMap<String, Property>();
+ private static final Map<String, Property> JMS_CONNECTION_INFO = new HashMap<String, Property>();
static {
WF_ACTION.put("getId", new Property(JsonTags.WORKFLOW_ACTION_ID, String.class));
@@ -178,6 +184,8 @@ public class JsonToBean {
BULK_RESPONSE.put("getCoordinator", new Property(JsonTags.BULK_RESPONSE_COORDINATOR, CoordinatorJob.class, true));
BULK_RESPONSE.put("getAction", new Property(JsonTags.BULK_RESPONSE_ACTION, CoordinatorAction.class, true));
+ JMS_CONNECTION_INFO.put("getTopicName", new Property(JsonTags.JMS_TOPIC_NAME, String.class));
+ JMS_CONNECTION_INFO.put("getJNDIProperties", new Property(JsonTags.JMS_JNDI_PROPERTIES, Properties.class));
}
/**
@@ -214,6 +222,7 @@ public class JsonToBean {
else if (prop.type == CoordinatorJob.class) {
return createCoordinatorJobList((JSONArray) json.get(prop.label));
}
+
else {
throw new RuntimeException("Unsupported list type : " + prop.type.getSimpleName());
}
@@ -243,6 +252,15 @@ public class JsonToBean {
else if (type == WorkflowAction.class) {
return createWorkflowAction((JSONObject) obj);
}
+ else if (type == Properties.class){
+ JSONObject jsonMap = (JSONObject)JSONValue.parse((String)obj);
+ Properties props = new Properties();
+ Set<Map.Entry> entrySet = jsonMap.entrySet();
+ for (Map.Entry jsonEntry: entrySet){
+ props.put(jsonEntry.getKey(), jsonEntry.getValue());
+ }
+ return props;
+ }
else {
throw new RuntimeException("Unsupported type : " + type.getSimpleName());
}
@@ -339,6 +357,18 @@ public class JsonToBean {
new JsonInvocationHandler(COORD_JOB, json));
}
+
+ /**
+ * Creates a JMSInfo bean from a JSON object.
+ *
+ * @param json json object.
+ * @return a coordinator job bean populated with the JSON object values.
+ */
+ public static JMSConnectionInfo createJMSConnectionInfo(JSONObject json) {
+ return (JMSConnectionInfo) Proxy.newProxyInstance(JsonToBean.class.getClassLoader(),
+ new Class[] { JMSConnectionInfo.class }, new JsonInvocationHandler(JMS_CONNECTION_INFO, json));
+ }
+
/**
* Creates a list of coordinator job beans from a JSON array.
*
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java Wed Apr 17 23:55:20 2013
@@ -146,8 +146,10 @@ public interface RestConstants {
public static final String SLA = "sla";
public static final String DO_AS_PARAM = "doAs";
-
+
public static final String TIME_ZONE_PARAM = "timezone";
-
+
public static final String ADMIN_TIME_ZONES_RESOURCE = "available-timezones";
+
+ public static final String JOB_SHOW_JMS_INFO = "jmsinfo";
}
Modified: oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java (original)
+++ oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,12 +20,15 @@ package org.apache.oozie.client.rest;
import junit.framework.TestCase;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.JMSConnectionInfo;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
import java.util.List;
+import java.util.Properties;
public class TestJsonToBean extends TestCase {
@@ -312,4 +315,25 @@ public class TestJsonToBean extends Test
assertEquals("cj2", list.get(1).getId());
}
+
+ private JSONObject createJMSInfoJSONObject(){
+ JSONObject json = new JSONObject();
+ json.put(JsonTags.JMS_TOPIC_NAME, "topic");
+ Properties props = new Properties();
+ props.put("k1", "v1");
+ props.put("k2", "v2");
+ json.put(JsonTags.JMS_JNDI_PROPERTIES, JSONValue.toJSONString(props));
+ return json;
+ }
+
+ public void testParseJMSInfo() {
+ JSONObject json = createJMSInfoJSONObject();
+ JMSConnectionInfo jmsDetails = JsonToBean.createJMSConnectionInfo(json);
+ assertEquals("topic", jmsDetails.getTopicName());
+ Properties jmsProps = jmsDetails.getJNDIProperties();
+ assertNotNull(jmsDetails.getJNDIProperties());
+ assertEquals("v1", jmsProps.get("k1"));
+
+ }
+
}
Modified: oozie/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/pom.xml?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/pom.xml (original)
+++ oozie/trunk/core/pom.xml Wed Apr 17 23:55:20 2013
@@ -256,12 +256,6 @@
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-client</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
</dependency>
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java Wed Apr 17 23:55:20 2013
@@ -23,6 +23,9 @@ import java.io.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.JMSInfoXCommand;
public abstract class BaseEngine {
public static final String USE_XCOMMAND = "oozie.useXCommand";
@@ -194,4 +197,20 @@ public abstract class BaseEngine {
*/
public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
+ /**
+ * Return the jms info about a job.
+ *
+ * @param jobId job Id.
+ * @return the JMS info bean
+ * @throws DagEngineException thrown if the jms info could not be obtained.
+ */
+ public JMSConnectionInfoBean getJMSConnectionInfo(String jobId) throws DagEngineException {
+ try {
+ return new JMSInfoXCommand(jobId).call();
+ }
+ catch (CommandException ex) {
+ throw new DagEngineException(ex);
+ }
+ }
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java Wed Apr 17 23:55:20 2013
@@ -76,7 +76,9 @@ import org.apache.openjpa.persistence.jd
"c.id, c.appName, c.status FROM CoordinatorActionBean a, CoordinatorJobBean c " +
"WHERE a.jobId = c.id AND c.bundleId = :bundleId ORDER BY a.jobId, a.createdTimestamp"),
- @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c") })
+ @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c"),
+
+ @NamedQuery(name = "GET_BUNDLE_JOB_FOR_USER", query = "select w.user from BundleJobBean w where w.id = :id") })
public class BundleJobBean extends JsonBundleJob implements Writable {
@Basic
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java Wed Apr 17 23:55:20 2013
@@ -75,7 +75,10 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_COORD_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from CoordinatorJobBean w where w.bundleId = :parentId and (w.status NOT IN ('SUCCEEDED', 'FAILED', 'KILLED', 'DONEWITHERROR') OR w.lastModifiedTimestamp >= :lastModTime)"),
- @NamedQuery(name = "GET_COORD_JOB_FOR_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id")
+ @NamedQuery(name = "GET_COORD_JOB_FOR_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id"),
+
+ @NamedQuery(name = "GET_COORD_JOB_FOR_USER", query = "select w.user from CoordinatorJobBean w where w.id = :id")
+
})
public class CoordinatorJobBean extends JsonCoordinatorJob implements Writable {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Wed Apr 17 23:55:20 2013
@@ -234,6 +234,8 @@ public enum ErrorCode {
E1501(XLog.STD, "Error in getting HCat Access [{0}]"),
+ E1601(XLog.STD, "Cannot retrieve JMS connection info [{0}]"),
+
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
private String template;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java Wed Apr 17 23:55:20 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -71,8 +71,9 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_WORKFLOWS_WITH_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId = :parentId"),
- @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.status = 'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >= :endTime)")
+ @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.status = 'PREP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.endTimestamp >= :endTime)"),
+ @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id")
})
public class WorkflowJobBean extends JsonWorkflowJob implements Writable {
Added: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JMSConnectionInfoBean.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.rest;
+
+import java.util.Properties;
+
+import org.apache.oozie.client.JMSConnectionInfo;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+/**
+ * JMS connection info bean representing the JMS related information for a job
+ *
+ */
+public class JMSConnectionInfoBean implements JMSConnectionInfo, JsonBean {
+
+ private Properties JNDIProperties;
+ private String topicName;
+
+
+ @Override
+ public JSONObject toJSONObject() {
+ return toJSONObject("GMT");
+ }
+
+ /**
+ * Set the topic name
+ * @param topicName
+ */
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ /**
+ * Set the JNDI properties for jms connection
+ * @param JNDIProperties
+ */
+ public void setJNDIProperties(Properties JNDIProperties) {
+ this.JNDIProperties = JNDIProperties;
+ }
+
+ @Override
+ public String getTopicName() {
+ return topicName;
+ }
+
+ @Override
+ public Properties getJNDIProperties() {
+ return JNDIProperties;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public JSONObject toJSONObject(String timeZoneId) {
+ JSONObject json = new JSONObject();
+ json.put(JsonTags.JMS_JNDI_PROPERTIES, JSONValue.toJSONString(JNDIProperties));
+ json.put(JsonTags.JMS_TOPIC_NAME, topicName);
+ return json;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/JMSInfoXCommand.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.JMSConnectionInfoBean;
+import org.apache.oozie.jms.DefaultJMSServerInfo;
+import org.apache.oozie.jms.JMSJobEventListener;
+import org.apache.oozie.jms.JMSServerInfo;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Command to create and retrieve the JMS ConnectionInfo bean
+ *
+ */
+public class JMSInfoXCommand extends XCommand<JMSConnectionInfoBean> {
+
+ private final String jobId;
+ private final Configuration conf = Services.get().getConf();
+
+ /**
+ * Constructor for creating the JMSInfoXcommand
+ * @param jobId
+ */
+ public JMSInfoXCommand(String jobId) {
+ super("jms_info", "jms_info", 0);
+ this.jobId = jobId;
+ }
+
+ protected JMSConnectionInfoBean execute() throws CommandException {
+ String connectionProperties = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
+ Class<?> defaultClazz = conf.getClass(JMSAccessorService.JMS_PRODUCER_CONNECTION_INFO_IMPL, DefaultJMSServerInfo.class);
+ JMSServerInfo jmsServerInfo = null;
+ try {
+ if (defaultClazz == DefaultJMSServerInfo.class) {
+ jmsServerInfo = new DefaultJMSServerInfo();
+ }
+ else {
+ jmsServerInfo = (JMSServerInfo) ReflectionUtils.newInstance(defaultClazz, null);
+ }
+ JMSConnectionInfoBean jmsBean = jmsServerInfo.getJMSConnectionInfoBean(connectionProperties, jobId);
+ return jmsBean;
+ }
+ catch (Exception e) {
+ throw new CommandException(ErrorCode.E1601, e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return false;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return jobId;
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java Wed Apr 17 23:55:20 2013
@@ -110,7 +110,7 @@ public class CoordinatorActionEvent exte
}
public void setErrorMessage(String msg) {
- errorCode = msg;
+ errorMessage = msg;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java?rev=1469108&r1=1469107&r2=1469108&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java Wed Apr 17 23:55:20 2013
@@ -40,129 +40,33 @@ public abstract class JobEventListener {
public abstract void destroy();
/**
- * On workflow job transition to start state
+ * On workflow job transition
* @param WorkflowJobEvent
*/
- public abstract void onWorkflowJobStart(WorkflowJobEvent wje);
+ public abstract void onWorkflowJobEvent(WorkflowJobEvent wje);
/**
- * On workflow job transition to success state
- * @param WorkflowJobEvent
- */
- public abstract void onWorkflowJobSuccess(WorkflowJobEvent wje);
-
- /**
- * On workflow job transition to failure state
- * @param WorkflowJobEvent
- */
- public abstract void onWorkflowJobFailure(WorkflowJobEvent wje);
-
- /**
- * On workflow job transition to suspend state
- * @param WorkflowJobEvent
- */
- public abstract void onWorkflowJobSuspend(WorkflowJobEvent wje);
-
- /**
- * On workflow action transition to start state
- * @param WorkflowActionEvent
- */
- public abstract void onWorkflowActionStart(WorkflowActionEvent wae);
-
- /**
- * On workflow action transition to success state
+ * On workflow action transition
* @param WorkflowActionEvent
*/
- public abstract void onWorkflowActionSuccess(WorkflowActionEvent wae);
-
- /**
- * On workflow action transition to failure state
- * @param WorkflowActionEvent
- */
- public abstract void onWorkflowActionFailure(WorkflowActionEvent wae);
-
- /**
- * On workflow action transition to suspend state
- * @param WorkflowActionEvent
- */
- public abstract void onWorkflowActionSuspend(WorkflowActionEvent wae);
-
- /**
- * On coord job transition to start state
- * @param CoordinatorJobEvent
- */
- public abstract void onCoordinatorJobStart(CoordinatorJobEvent wje);
-
- /**
- * On coord job transition to success state
- * @param CoordinatorJobEvent
- */
- public abstract void onCoordinatorJobSuccess(CoordinatorJobEvent wje);
+ public abstract void onWorkflowActionEvent(WorkflowActionEvent wae);
/**
- * On coord job transition to failure state
+ * On coordinator job transition
* @param CoordinatorJobEvent
*/
- public abstract void onCoordinatorJobFailure(CoordinatorJobEvent wje);
+ public abstract void onCoordinatorJobEvent(CoordinatorJobEvent wje);
/**
- * On coord job transition to suspend state
- * @param CoordinatorJobEvent
- */
- public abstract void onCoordinatorJobSuspend(CoordinatorJobEvent wje);
-
- /**
- * On coord action transition to waiting state
- * @param CoordinatorActionEvent
- */
- public abstract void onCoordinatorActionWaiting(CoordinatorActionEvent wae);
-
- /**
- * On coord action transition to start state
- * @param CoordinatorActionEvent
- */
- public abstract void onCoordinatorActionStart(CoordinatorActionEvent wae);
-
- /**
- * On coord action transition to success state
- * @param CoordinatorActionEvent
- */
- public abstract void onCoordinatorActionSuccess(CoordinatorActionEvent wae);
-
- /**
- * On coord action transition to failure state
+ * On coordinator action transition
* @param CoordinatorActionEvent
*/
- public abstract void onCoordinatorActionFailure(CoordinatorActionEvent wae);
-
- /**
- * On coord action transition to suspend state
- * @param CoordinatorActionEvent
- */
- public abstract void onCoordinatorActionSuspend(CoordinatorActionEvent wae);
-
- /**
- * On bundle job transition to start state
- * @param BundleJobEvent
- */
- public abstract void onBundleJobStart(BundleJobEvent wje);
-
- /**
- * On bundle job transition to success state
- * @param BundleJobEvent
- */
- public abstract void onBundleJobSuccess(BundleJobEvent wje);
-
- /**
- * On bundle job transition to failure state
- * @param BundleJobEvent
- */
- public abstract void onBundleJobFailure(BundleJobEvent wje);
+ public abstract void onCoordinatorActionEvent(CoordinatorActionEvent wae);
/**
- * On bundle job transition to suspend state
+ * On bundle job transition
* @param BundleJobEvent
*/
- public abstract void onBundleJobSuspend(BundleJobEvent wje);
+ public abstract void onBundleJobEvent(BundleJobEvent wje);
}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/JSONMessageSerializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.messaging;
+
+public class JSONMessageSerializer extends MessageSerializer {
+ public static final String MESSAGE_FORMAT = "json";
+
+ @Override
+ public String getSerializedObject(Object object) {
+ try {
+ return mapper.writeValueAsString(object);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONMessage.", exception);
+ }
+ }
+
+ @Override
+ public String getMessageFormat() {
+ return MESSAGE_FORMAT;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageFactory.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.messaging;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.client.event.message.CoordinatorActionMessage;
+import org.apache.oozie.client.event.message.WorkflowJobMessage;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.service.Services;
+
+/**
+ * Factory for constructing messages and retrieving the serializer
+ */
+public class MessageFactory {
+
+ public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json");
+ public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.message.serialize.";
+
+ private static class MessageSerializerHolder {
+ private static String messageSerializerInstance = Services
+ .get()
+ .getConf()
+ .get(OOZIE_MESSAGE_SERIALIZE + OOZIE_MESSAGE_FORMAT,
+ "org.apache.oozie.event.messaging.JSONMessageSerializer");
+ public static final MessageSerializer INSTANCE;
+ static {
+ try {
+ INSTANCE = (MessageSerializer) ReflectionUtils.newInstance(Class.forName(messageSerializerInstance),
+ null);
+ }
+ catch (ClassNotFoundException cnfe) {
+ throw new IllegalStateException("Could not construct the serializer ", cnfe);
+ }
+ }
+ }
+
+ /**
+ * Gets the configured serializer
+ *
+ * @return
+ */
+ public static MessageSerializer getMessageSerializer() {
+ return MessageSerializerHolder.INSTANCE;
+ }
+
+ /**
+ * Constructs and returns the workflow job message for workflow job event
+ *
+ * @param wfJobEvent the workflow job event
+ * @return
+ */
+ public static WorkflowJobMessage createWorkflowJobMessage(WorkflowJobEvent wfJobEvent) {
+ WorkflowJobMessage wfJobMessage = new WorkflowJobMessage(wfJobEvent.getEventStatus(), wfJobEvent.getId(),
+ wfJobEvent.getParentId(), wfJobEvent.getStartTime(), wfJobEvent.getEndTime(), wfJobEvent.getStatus(),
+ wfJobEvent.getUser(), wfJobEvent.getAppName(), wfJobEvent.getErrorCode(), wfJobEvent.getErrorMessage());
+ return wfJobMessage;
+ }
+
+ /**
+ * Constructs and returns the coordinator action message for coordinator
+ * action event
+ *
+ * @param coordActionEvent the coordinator action event
+ * @return
+ */
+ public static CoordinatorActionMessage createCoordinatorActionMessage(CoordinatorActionEvent coordActionEvent) {
+ CoordinatorActionMessage coordActionMessage = new CoordinatorActionMessage(coordActionEvent.getEventStatus(),
+ coordActionEvent.getId(), coordActionEvent.getParentId(), coordActionEvent.getStartTime(),
+ coordActionEvent.getEndTime(), coordActionEvent.getNominalTime(), coordActionEvent.getStatus(),
+ coordActionEvent.getUser(), coordActionEvent.getAppName(), coordActionEvent.getMissingDeps(),
+ coordActionEvent.getErrorCode(), coordActionEvent.getErrorMessage());
+ return coordActionMessage;
+ }
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/messaging/MessageSerializer.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.messaging;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Message Serializer to serialize the java object
+ *
+ */
+public abstract class MessageSerializer {
+ static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+ /**
+ * Get the serialized string from object
+ * @param object the java object
+ * @return
+ */
+ public abstract String getSerializedObject(Object object);
+
+ /**
+ * Get the message format for the serializer
+ * @return
+ */
+ public abstract String getMessageFormat();
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java?rev=1469108&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobGetForUserJPAExecutor.java Wed Apr 17 23:55:20 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Retrieve the name of the user submitting the bundle job
+ *
+ */
+public class BundleJobGetForUserJPAExecutor implements JPAExecutor<String> {
+
+ private String bundleJobId = null;
+
+ public BundleJobGetForUserJPAExecutor(String bundleJobId) {
+ ParamChecker.notNull(bundleJobId, "bundleJobId");
+ this.bundleJobId = bundleJobId;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ public String getName() {
+ return "BundleJobGetForUserJPAExecutor";
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public String execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("GET_BUNDLE_JOB_FOR_USER");
+ q.setParameter("id", bundleJobId);
+ String user = (String) q.getSingleResult();
+ return user;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+}