You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/10/26 23:29:35 UTC

svn commit: r1402670 - in /oozie/branches/hcat-intre: ./ core/ core/src/main/java/org/apache/oozie/service/ core/src/main/resources/ core/src/test/java/org/apache/oozie/service/

Author: kamrul
Date: Fri Oct 26 21:29:35 2012
New Revision: 1402670

URL: http://svn.apache.org/viewvc?rev=1402670&view=rev
Log:
OOZIE-1032 Create JMSService used for any JMS compliant product (Mohammad)

Added:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
Modified:
    oozie/branches/hcat-intre/core/pom.xml
    oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
    oozie/branches/hcat-intre/pom.xml
    oozie/branches/hcat-intre/release-log.txt

Modified: oozie/branches/hcat-intre/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Fri Oct 26 21:29:35 2012
@@ -265,6 +265,12 @@
             <scope>compile</scope>
         </dependency>
 
+           <dependency>
+                <groupId>org.apache.activemq</groupId>
+                <artifactId>activemq-all</artifactId>
+                <scope>compile</scope>
+           </dependency>
+
         <!-- For drawing runtime DAG -->
         <dependency>
             <groupId>net.sf.jung</groupId>

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1402670&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Fri Oct 26 21:29:35 2012
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.XLog;
+
+/**
+ * This class will 1. Create/Manage JMS connections using user configured
+ * properties 2. Create/Manage session for specific connection/topic. 3. Provide
+ * a way to create a subscriber and publisher 4. Pure JMS complian
+ * (implementation independent but primarily tested against Apace ActiveMQ) For
+ * connection property, it reads property from oozie-site.xml. Since it supports
+ * multiple connections, each property will be grouped with fixed tag. the
+ * caller will use the tag to accees the connection/session/subscriber/producer.
+ */
+public class JMSAccessorService implements Service {
+    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
+    public static final String JMS_CONNECTION_FACTORY = CONF_PREFIX + "jms.connectionFactory";
+    public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "connections";
+    public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
+    public static final String DEFAULT_SERVER_ENDPOINT = "default";
+
+    private static XLog LOG;
+    private Configuration conf;
+    ConcurrentHashMap<String, ConnectionContext> connSessionMap = new ConcurrentHashMap<String, ConnectionContext>();
+    HashMap<String, Properties> hmConnProps = new HashMap<String, Properties>();
+
+    @Override
+    public void init(Services services) throws ServiceException {
+        LOG = XLog.getLog(getClass());
+        conf = services.getConf();
+        parseConfiguration(conf);
+        establishConnections();
+    }
+
+    /**
+     * Returns Consumer object for specific service end point and topic name
+     *
+     * @param endPoint : Service end-point (preferably HCatalog server address)
+     *        to determine the JMS connection properties
+     * @param topicName : topic to listen on
+     * @return : MessageConsumer to receive JMS message
+     * @throws JMSException
+     */
+    public MessageConsumer getMessageConsumer(String endPoint, String topicName) throws JMSException {
+        ConnectionContext connCtx = getConnectionContext(endPoint);
+        MessageConsumer ret = null;
+        if (connCtx != null) {
+            ret = connCtx.getConsumer(topicName);
+        }
+        return ret;
+    }
+
+    /**
+     * Returns Producer object for specific service end point and topic name
+     *
+     * @param endPoint : Service end-point (preferably HCatalog server address)
+     *        to determine the JMS connection properties
+     * @param topicName : topic to send message
+     * @return : MessageProducer to send JMS message
+     * @throws JMSException
+     */
+    public MessageProducer getMessageProducer(String endPoint, String topicName) throws JMSException {
+        ConnectionContext connCtx = getConnectionContext(endPoint);
+        MessageProducer ret = null;
+        if (connCtx != null) {
+            ret = connCtx.getProducer(topicName);
+        }
+        return ret;
+    }
+
+    /**
+     * Returns JMS session object for specific service end point and topic name
+     *
+     * @param endPoint : Service end-point (preferably HCatalog server address)
+     *        to determine the JMS connection properties
+     * @param topicName : topic to listen on
+     * @return : Session to send/receive JMS message
+     * @throws JMSException
+     */
+    public Session getSession(String endPoint, String topicName) throws JMSException {
+        ConnectionContext connCtx = getConnectionContext(endPoint);
+        Session ret = null;
+        if (connCtx != null) {
+            ret = connCtx.getSession(topicName);
+        }
+        return ret;
+    }
+
+    /**
+     * Returns JMS connection context object for specific service end point
+     *
+     * @param endPoint : Service end-point (preferably HCatalog server address)
+     *        to determine the JMS connection properties
+     * @return : Connection context to send/receive JMS message
+     */
+    public ConnectionContext getConnectionContext(String endPoint) {
+        ConnectionContext ret = null;
+        if (connSessionMap.containsKey(endPoint)) {
+            ret = connSessionMap.get(endPoint);
+        }
+        else {
+            LOG.warn("Connection doesn't exists for end point " + endPoint);
+        }
+        return ret;
+    }
+
+    /**
+     * Remove JMS session object for specific service end point and topic name
+     *
+     * @param endPoint : Service end-point (preferably HCatalog server address)
+     *        to determine the JMS connection properties
+     * @param topicName : topic to listen on
+     * @throws JMSException
+     */
+    public void removeSession(String endPoint, String topicName) throws JMSException {
+        ConnectionContext connCtx = getConnectionContext(endPoint);
+        if (connCtx != null) {
+            connCtx.returnSession(topicName);
+        }
+        return;
+    }
+
+    private void establishConnections() throws ServiceException {
+        for (String key : hmConnProps.keySet()) {
+            connSessionMap.put(key, new ConnectionContext(getConnection(hmConnProps.get(key))));
+        }
+    }
+
+    private void parseConfiguration(Configuration conf) {
+        String[] keyVals = conf.getStrings(JMS_CONNECTIONS_PROPERTIES, "");
+        for (String kVal : keyVals) {
+            LOG.info("Key=value " + kVal);
+            if (kVal.trim().length() > 0) {
+                addToHM(kVal);
+            }
+        }
+    }
+
+    private void addToHM(String kVal) {
+        int pos = kVal.indexOf("=");
+        Properties props = new Properties();
+        if (pos > 0) {
+            String val = kVal.substring(pos + 1);
+            String[] propArr = val.split(";");
+            for (String pair : propArr) {
+                String[] kV = pair.split("#");
+                if (kV.length > 1) {
+                    props.put(kV[0].trim(), kV[1].trim());
+                }
+                else {
+                    LOG.info("Unformatted properties. Expected key#value : " + pair);
+                }
+            }
+            String key = kVal.substring(0, pos);
+            LOG.info(key + ": Adding " + props);
+            hmConnProps.put(key.trim(), props);
+        }
+        else {
+            LOG.info("Unformatted properties. Expected two parts : " + kVal);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public Class<? extends Service> getInterface() {
+        return JMSAccessorService.class;
+    }
+
+    /*
+     * Look up connection factory Create connection
+     */
+    private Connection getConnection(Properties props) throws ServiceException {
+
+        Connection conn;
+        try {
+            Context jndiContext = getJndiContext(props);
+            String connFacName = (String) jndiContext.getEnvironment().get(JMS_CONNECTION_FACTORY);
+            if (connFacName == null || connFacName.trim().length() == 0) {
+                connFacName = "ConnectionFactory";
+            }
+            ConnectionFactory connFac = (ConnectionFactory) jndiContext.lookup(connFacName);
+            LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
+            conn = connFac.createConnection();
+            conn.start();
+            conn.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException je) {
+                    LOG.error(je);
+                }
+            });
+
+        }
+        catch (NamingException e) {
+            throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+        }
+        catch (JMSException e) {
+            throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+        }
+        return conn;
+    }
+
+    /*
+     * Create a JNDI API InitialContext object
+     */
+    private Context getJndiContext(Properties props) throws ServiceException {
+        Context ctx;
+        try {
+            ctx = new InitialContext(props);
+        }
+        catch (NamingException e) {
+            LOG.warn("Unable to initialize the context :", e);
+            throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+        }
+        return ctx;
+    }
+
+    @Override
+    public void finalize() {
+        LOG.info("Finalizing ");
+        for (Entry<String, ConnectionContext> entry : connSessionMap.entrySet()) {
+            try {
+                entry.getValue().getConnection().close();
+            }
+            catch (JMSException e) {
+                LOG.warn("Unable to close the connection for " + entry.getKey(), e);
+            }
+        }
+    }
+
+    /**
+     * This class maintains a JMS connection and map of topic to Session. Only
+     * one session per topic.
+     */
+    class ConnectionContext {
+        Connection connection;
+        HashMap<String, Session> hmSessionTopic = new HashMap<String, Session>();
+
+        public ConnectionContext(Connection conn) {
+            this.connection = conn;
+        }
+
+        /**
+         * If there is no existing session for a specific topic name, this
+         * method creates a new session. Otherwise, return the existing seesion
+         *
+         * @param topic : Name of the topic
+         * @return a new/exiting JMS session
+         * @throws JMSException
+         */
+        public Session getSession(String topic) throws JMSException {
+            Session ret;
+            if (hmSessionTopic.containsKey(topic)) {
+                ret = hmSessionTopic.get(topic);
+            }
+            else {
+                int sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
+                ret = connection.createSession(false, sessionOpts);
+                hmSessionTopic.put(topic, ret);
+            }
+            return ret;
+        }
+
+        /**
+         * Returns a new MessageConsumer object.
+         * It is the caller responsibility to close the MessageConsumer when done
+         *
+         * @param topicName : Name of the topic
+         * @return MessageConsumer
+         * @throws JMSException
+         */
+        public MessageConsumer getConsumer(String topicName) throws JMSException {
+            Session session = getSession(topicName);
+            Topic topic = session.createTopic(topicName);
+            MessageConsumer consumer = session.createConsumer(topic);
+            return consumer;
+        }
+
+        /**
+         * Returns a new MessageProducer object.
+         * It is the caller responsibility to close the MessageProducer when done
+         *
+         * @param topicName : Name of the topic
+         * @return MessageProducer
+         * @throws JMSException
+         */
+        public MessageProducer getProducer(String topicName) throws JMSException {
+            Session session = getSession(topicName);
+            Topic topic = session.createTopic(topicName);
+            MessageProducer producer = session.createProducer(topic);
+            return producer;
+        }
+
+        /**
+         * Close an existing session and remove from the Map
+         *
+         * @param topic : Name of a topic
+         * @throws JMSException
+         */
+        public void returnSession(String topic) throws JMSException {
+            if (hmSessionTopic.containsKey(topic)) {
+                Session sess = hmSessionTopic.get(topic);
+                sess.close();
+                hmSessionTopic.remove(topic);
+            }
+            else {
+                LOG.info("Topic " + topic + " does n't have any active session to close ");
+            }
+        }
+
+        /**
+         * @return JMS connection
+         */
+        public Connection getConnection() {
+            return connection;
+        }
+
+        /**
+         * Set JMS connection
+         *
+         * @param connection
+         */
+        public void setConnection(Connection connection) {
+            this.connection = connection;
+        }
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml (original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Fri Oct 26 21:29:35 2012
@@ -129,6 +129,20 @@
         </description>
     </property>
 
+ <!-- JMSAccessorService -->
+   <property>
+        <name>oozie.service.JMSAccessorService.connections</name>
+        <value>
+        default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616,
+        </value>
+        <description>
+        Specify the map  of endpoints to JMS configuration properties. In general, endpoint 
+        identifies the HCatalog server URL. "default" is used if no endpoint is mentioned 
+        in the query. If some JMS property is not defined, the system will use the property 
+        defined jndi.properties. jndi.properties files is retrieved from the application classpath.
+        </description>
+    </property>
+
     <!-- ConfigurationService -->
 
     <property>

Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1402670&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Fri Oct 26 21:29:35 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.Arrays;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.test.XTestCase;
+
+public class TestJMSAccessorService extends XTestCase {
+    private Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_CLASSES,
+                StringUtils.join(",", Arrays.asList(JMSAccessorService.class.getName())));
+        conf.set(
+                JMSAccessorService.JMS_CONNECTIONS_PROPERTIES,
+                "default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false,");
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testService() {
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        Assert.assertNotNull(jmsService);
+    }
+
+    public void testConsumer() throws Exception {
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        MessageConsumer consumer = null;
+        try {
+            consumer = jmsService.getMessageConsumer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+            assert (consumer != null);
+        }
+        finally {
+            if (consumer != null) {
+                jmsService.removeSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+            }
+        }
+    }
+
+    public void testProducer() throws Exception {
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        MessageProducer producer = null;
+        try {
+            producer = jmsService.getMessageProducer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+            assert (producer != null);
+        }
+        finally {
+            if (producer != null) {
+                jmsService.removeSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+            }
+        }
+    }
+
+    public void testSession() throws Exception {
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        Session sess = null;
+        try {
+            sess = jmsService.getSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+            assert (sess != null);
+        }
+        finally {
+            if (sess != null) {
+                jmsService.removeSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
+            }
+        }
+    }
+
+    public void testConnection() {
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        JMSAccessorService.ConnectionContext conCtx = jmsService
+                .getConnectionContext(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
+        assert (conCtx.getConnection() != null);
+    }
+}

Modified: oozie/branches/hcat-intre/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/pom.xml?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/pom.xml (original)
+++ oozie/branches/hcat-intre/pom.xml Fri Oct 26 21:29:35 2012
@@ -457,6 +457,12 @@
                 <version>${hive.version}</version>
             </dependency>
 
+           <dependency>
+                <groupId>org.apache.activemq</groupId>
+                <artifactId>activemq-all</artifactId>
+                <version>5.7.0</version>
+           </dependency>
+
             <dependency>
                 <groupId>commons-logging</groupId>
                 <artifactId>commons-logging</artifactId>

Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1402670&r1=1402669&r2=1402670&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Fri Oct 26 21:29:35 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1032 Create JMSService used for any JMS compliant product (Mohammad)
 OOZIE-1027 Command line mr does not support NN/JT parameters properly (Mona via Mohammad)
 OOZIE-1020 BulkJPAExecutor handling date-time value incorrectly.(Mona via Mohammad)`
 OOZIE-967 Coordinator action window in web UI never finishes refreshing (kinley via tucu)