You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/10/26 23:29:35 UTC
svn commit: r1402670 - in /oozie/branches/hcat-intre: ./ core/
core/src/main/java/org/apache/oozie/service/ core/src/main/resources/
core/src/test/java/org/apache/oozie/service/
Author: kamrul
Date: Fri Oct 26 21:29:35 2012
New Revision: 1402670
URL: http://svn.apache.org/viewvc?rev=1402670&view=rev
Log:
OOZIE-1032 Create JMSService used for any JMS compliant product (Mohammad)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
Modified:
oozie/branches/hcat-intre/core/pom.xml
oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
oozie/branches/hcat-intre/pom.xml
oozie/branches/hcat-intre/release-log.txt
Modified: oozie/branches/hcat-intre/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Fri Oct 26 21:29:35 2012
@@ -265,6 +265,12 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-all</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
<!-- For drawing runtime DAG -->
<dependency>
<groupId>net.sf.jung</groupId>
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1402670&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Fri Oct 26 21:29:35 2012
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.XLog;
+
+/**
+ * This class will 1. Create/Manage JMS connections using user configured
+ * properties 2. Create/Manage session for specific connection/topic. 3. Provide
+ * a way to create a subscriber and publisher 4. Pure JMS complian
+ * (implementation independent but primarily tested against Apace ActiveMQ) For
+ * connection property, it reads property from oozie-site.xml. Since it supports
+ * multiple connections, each property will be grouped with fixed tag. the
+ * caller will use the tag to accees the connection/session/subscriber/producer.
+ */
+public class JMSAccessorService implements Service {
+ public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
+ public static final String JMS_CONNECTION_FACTORY = CONF_PREFIX + "jms.connectionFactory";
+ public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "connections";
+ public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
+ public static final String DEFAULT_SERVER_ENDPOINT = "default";
+
+ private static XLog LOG;
+ private Configuration conf;
+ ConcurrentHashMap<String, ConnectionContext> connSessionMap = new ConcurrentHashMap<String, ConnectionContext>();
+ HashMap<String, Properties> hmConnProps = new HashMap<String, Properties>();
+
+ @Override
+ public void init(Services services) throws ServiceException {
+ LOG = XLog.getLog(getClass());
+ conf = services.getConf();
+ parseConfiguration(conf);
+ establishConnections();
+ }
+
+ /**
+ * Returns Consumer object for specific service end point and topic name
+ *
+ * @param endPoint : Service end-point (preferably HCatalog server address)
+ * to determine the JMS connection properties
+ * @param topicName : topic to listen on
+ * @return : MessageConsumer to receive JMS message
+ * @throws JMSException
+ */
+ public MessageConsumer getMessageConsumer(String endPoint, String topicName) throws JMSException {
+ ConnectionContext connCtx = getConnectionContext(endPoint);
+ MessageConsumer ret = null;
+ if (connCtx != null) {
+ ret = connCtx.getConsumer(topicName);
+ }
+ return ret;
+ }
+
+ /**
+ * Returns Producer object for specific service end point and topic name
+ *
+ * @param endPoint : Service end-point (preferably HCatalog server address)
+ * to determine the JMS connection properties
+ * @param topicName : topic to send message
+ * @return : MessageProducer to send JMS message
+ * @throws JMSException
+ */
+ public MessageProducer getMessageProducer(String endPoint, String topicName) throws JMSException {
+ ConnectionContext connCtx = getConnectionContext(endPoint);
+ MessageProducer ret = null;
+ if (connCtx != null) {
+ ret = connCtx.getProducer(topicName);
+ }
+ return ret;
+ }
+
+ /**
+ * Returns JMS session object for specific service end point and topic name
+ *
+ * @param endPoint : Service end-point (preferably HCatalog server address)
+ * to determine the JMS connection properties
+ * @param topicName : topic to listen on
+ * @return : Session to send/receive JMS message
+ * @throws JMSException
+ */
+ public Session getSession(String endPoint, String topicName) throws JMSException {
+ ConnectionContext connCtx = getConnectionContext(endPoint);
+ Session ret = null;
+ if (connCtx != null) {
+ ret = connCtx.getSession(topicName);
+ }
+ return ret;
+ }
+
+ /**
+ * Returns JMS connection context object for specific service end point
+ *
+ * @param endPoint : Service end-point (preferably HCatalog server address)
+ * to determine the JMS connection properties
+ * @return : Connection context to send/receive JMS message
+ */
+ public ConnectionContext getConnectionContext(String endPoint) {
+ ConnectionContext ret = null;
+ if (connSessionMap.containsKey(endPoint)) {
+ ret = connSessionMap.get(endPoint);
+ }
+ else {
+ LOG.warn("Connection doesn't exists for end point " + endPoint);
+ }
+ return ret;
+ }
+
+ /**
+ * Remove JMS session object for specific service end point and topic name
+ *
+ * @param endPoint : Service end-point (preferably HCatalog server address)
+ * to determine the JMS connection properties
+ * @param topicName : topic to listen on
+ * @throws JMSException
+ */
+ public void removeSession(String endPoint, String topicName) throws JMSException {
+ ConnectionContext connCtx = getConnectionContext(endPoint);
+ if (connCtx != null) {
+ connCtx.returnSession(topicName);
+ }
+ return;
+ }
+
+ private void establishConnections() throws ServiceException {
+ for (String key : hmConnProps.keySet()) {
+ connSessionMap.put(key, new ConnectionContext(getConnection(hmConnProps.get(key))));
+ }
+ }
+
+ private void parseConfiguration(Configuration conf) {
+ String[] keyVals = conf.getStrings(JMS_CONNECTIONS_PROPERTIES, "");
+ for (String kVal : keyVals) {
+ LOG.info("Key=value " + kVal);
+ if (kVal.trim().length() > 0) {
+ addToHM(kVal);
+ }
+ }
+ }
+
+ private void addToHM(String kVal) {
+ int pos = kVal.indexOf("=");
+ Properties props = new Properties();
+ if (pos > 0) {
+ String val = kVal.substring(pos + 1);
+ String[] propArr = val.split(";");
+ for (String pair : propArr) {
+ String[] kV = pair.split("#");
+ if (kV.length > 1) {
+ props.put(kV[0].trim(), kV[1].trim());
+ }
+ else {
+ LOG.info("Unformatted properties. Expected key#value : " + pair);
+ }
+ }
+ String key = kVal.substring(0, pos);
+ LOG.info(key + ": Adding " + props);
+ hmConnProps.put(key.trim(), props);
+ }
+ else {
+ LOG.info("Unformatted properties. Expected two parts : " + kVal);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Class<? extends Service> getInterface() {
+ return JMSAccessorService.class;
+ }
+
+ /*
+ * Look up connection factory Create connection
+ */
+ private Connection getConnection(Properties props) throws ServiceException {
+
+ Connection conn;
+ try {
+ Context jndiContext = getJndiContext(props);
+ String connFacName = (String) jndiContext.getEnvironment().get(JMS_CONNECTION_FACTORY);
+ if (connFacName == null || connFacName.trim().length() == 0) {
+ connFacName = "ConnectionFactory";
+ }
+ ConnectionFactory connFac = (ConnectionFactory) jndiContext.lookup(connFacName);
+ LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
+ conn = connFac.createConnection();
+ conn.start();
+ conn.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException je) {
+ LOG.error(je);
+ }
+ });
+
+ }
+ catch (NamingException e) {
+ throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+ }
+ catch (JMSException e) {
+ throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+ }
+ return conn;
+ }
+
+ /*
+ * Create a JNDI API InitialContext object
+ */
+ private Context getJndiContext(Properties props) throws ServiceException {
+ Context ctx;
+ try {
+ ctx = new InitialContext(props);
+ }
+ catch (NamingException e) {
+ LOG.warn("Unable to initialize the context :", e);
+ throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+ }
+ return ctx;
+ }
+
+ @Override
+ public void finalize() {
+ LOG.info("Finalizing ");
+ for (Entry<String, ConnectionContext> entry : connSessionMap.entrySet()) {
+ try {
+ entry.getValue().getConnection().close();
+ }
+ catch (JMSException e) {
+ LOG.warn("Unable to close the connection for " + entry.getKey(), e);
+ }
+ }
+ }
+
+ /**
+ * This class maintains a JMS connection and map of topic to Session. Only
+ * one session per topic.
+ */
+ class ConnectionContext {
+ Connection connection;
+ HashMap<String, Session> hmSessionTopic = new HashMap<String, Session>();
+
+ public ConnectionContext(Connection conn) {
+ this.connection = conn;
+ }
+
+ /**
+ * If there is no existing session for a specific topic name, this
+ * method creates a new session. Otherwise, return the existing seesion
+ *
+ * @param topic : Name of the topic
+ * @return a new/exiting JMS session
+ * @throws JMSException
+ */
+ public Session getSession(String topic) throws JMSException {
+ Session ret;
+ if (hmSessionTopic.containsKey(topic)) {
+ ret = hmSessionTopic.get(topic);
+ }
+ else {
+ int sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
+ ret = connection.createSession(false, sessionOpts);
+ hmSessionTopic.put(topic, ret);
+ }
+ return ret;
+ }
+
+ /**
+ * Returns a new MessageConsumer object.
+ * It is the caller responsibility to close the MessageConsumer when done
+ *
+ * @param topicName : Name of the topic
+ * @return MessageConsumer
+ * @throws JMSException
+ */
+ public MessageConsumer getConsumer(String topicName) throws JMSException {
+ Session session = getSession(topicName);
+ Topic topic = session.createTopic(topicName);
+ MessageConsumer consumer = session.createConsumer(topic);
+ return consumer;
+ }
+
+ /**
+ * Returns a new MessageProducer object.
+ * It is the caller responsibility to close the MessageProducer when done
+ *
+ * @param topicName : Name of the topic
+ * @return MessageProducer
+ * @throws JMSException
+ */
+ public MessageProducer getProducer(String topicName) throws JMSException {
+ Session session = getSession(topicName);
+ Topic topic = session.createTopic(topicName);
+ MessageProducer producer = session.createProducer(topic);
+ return producer;
+ }
+
+ /**
+ * Close an existing session and remove from the Map
+ *
+ * @param topic : Name of a topic
+ * @throws JMSException
+ */
+ public void returnSession(String topic) throws JMSException {
+ if (hmSessionTopic.containsKey(topic)) {
+ Session sess = hmSessionTopic.get(topic);
+ sess.close();
+ hmSessionTopic.remove(topic);
+ }
+ else {
+ LOG.info("Topic " + topic + " does n't have any active session to close ");
+ }
+ }
+
+ /**
+ * @return JMS connection
+ */
+ public Connection getConnection() {
+ return connection;
+ }
+
+ /**
+ * Set JMS connection
+ *
+ * @param connection
+ */
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml (original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Fri Oct 26 21:29:35 2012
@@ -129,6 +129,20 @@
</description>
</property>
+ <!-- JMSAccessorService -->
+ <property>
+ <name>oozie.service.JMSAccessorService.connections</name>
+ <value>
+ default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616,
+ </value>
+ <description>
+ Specify the map of endpoints to JMS configuration properties. In general, endpoint
+ identifies the HCatalog server URL. "default" is used if no endpoint is mentioned
+ in the query. If some JMS property is not defined, the system will use the property
+ defined jndi.properties. jndi.properties files is retrieved from the application classpath.
+ </description>
+ </property>
+
<!-- ConfigurationService -->
<property>
Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1402670&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Fri Oct 26 21:29:35 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.Arrays;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.test.XTestCase;
+
+public class TestJMSAccessorService extends XTestCase {
+ private Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_CLASSES,
+ StringUtils.join(",", Arrays.asList(JMSAccessorService.class.getName())));
+ conf.set(
+ JMSAccessorService.JMS_CONNECTIONS_PROPERTIES,
+ "default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false,");
+ services.init();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testService() {
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ Assert.assertNotNull(jmsService);
+ }
+
+ public void testConsumer() throws Exception {
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ MessageConsumer consumer = null;
+ try {
+ consumer = jmsService.getMessageConsumer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+ assert (consumer != null);
+ }
+ finally {
+ if (consumer != null) {
+ jmsService.removeSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+ }
+ }
+ }
+
+ public void testProducer() throws Exception {
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ MessageProducer producer = null;
+ try {
+ producer = jmsService.getMessageProducer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+ assert (producer != null);
+ }
+ finally {
+ if (producer != null) {
+ jmsService.removeSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+ }
+ }
+ }
+
+ public void testSession() throws Exception {
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ Session sess = null;
+ try {
+ sess = jmsService.getSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+ assert (sess != null);
+ }
+ finally {
+ if (sess != null) {
+ jmsService.removeSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+ }
+ }
+ }
+
+ public void testConnection() {
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ JMSAccessorService.ConnectionContext conCtx = jmsService
+ .getConnectionContext(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
+ assert (conCtx.getConnection() != null);
+ }
+}
Modified: oozie/branches/hcat-intre/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/pom.xml?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/pom.xml (original)
+++ oozie/branches/hcat-intre/pom.xml Fri Oct 26 21:29:35 2012
@@ -457,6 +457,12 @@
<version>${hive.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-all</artifactId>
+ <version>5.7.0</version>
+ </dependency>
+
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Fri Oct 26 21:29:35 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1032 Create JMSService used for any JMS compliant product (Mohammad)
OOZIE-1027 Command line mr does not support NN/JT parameters properly (Mona via Mohammad)
OOZIE-1020 BulkJPAExecutor handling date-time value incorrectly.(Mona via Mohammad)`
OOZIE-967 Coordinator action window in web UI never finishes refreshing (kinley via tucu)