You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jetspeed-dev@portals.apache.org by jf...@apache.org on 2004/07/23 02:24:01 UTC

cvs commit: jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/jms AbstractJMSMessagingService.java

jford       2004/07/22 17:24:01

  Added:       src/java/org/apache/jetspeed/services/messaging
                        MessagingService.java
               src/java/org/apache/jetspeed/services/messaging/jms/activemq
                        MessagingServiceImpl.java
               src/java/org/apache/jetspeed/services/messaging/jms
                        AbstractJMSMessagingService.java
  Log:
  MessagingService implemenation
  
  Revision  Changes    Path
  1.1                  jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/MessagingService.java
  
  Index: MessagingService.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package org.apache.jetspeed.services.messaging;
  
  import javax.jms.Message;
  import javax.jms.MessageConsumer;
  import javax.jms.MessageListener;
  
  /**
   * @author <a href="mailto:jford@apache.org">Jeremy Ford</a>
   * @version $Id: MessagingService.java,v 1.1 2004/07/23 00:24:01 jford Exp $
   */
  public interface MessagingService
  {
      public static final String SERVICE_NAME = "MessagingService";
      
      public static final String USER_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.user.update";
      public static final String GROUP_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.group.update";
      public static final String ROLE_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.role.update";
      public static final String SECURITY_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.security.update";
      
      public static final int MESSAGE = 0;
      public static final int TEXT_MESSAGE = 1;
      public static final int BYTE_MESSAGE = 2;
      public static final int MAP_MESSAGE = 3;
      public static final int OBJECT_MESSAGE = 4;
      public static final int STREAM_MESSAGE = 5;
  
      public static final String REGISTRY_UPDATE_SUBJECT = "org.apache.jetspeed.services.registry.update";
      
      /**
       * This method creates a new consumer if one with the given id has not been created.
       * The consumer will be closed during the services shutdown.
       * 
       * @param listener
       * @param id
       * @param destination
       */
      public void addMessageListener(MessageListener listener, String id, String destination);
      
      /**
       * This method creates a new MessageConsumer.
       * It's lifecycle should be managed by the caller.
       * 
       * @param listener
       * @param destination
       */
      public MessageConsumer createConsumer(String destination);
      
      public void removeMessageListener(String id);
      
      public void sendMessage(Message message, String destination);
      public Message createMessage(int messageType);
  }
  
  
  
  1.1                  jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/jms/activemq/MessagingServiceImpl.java
  
  Index: MessagingServiceImpl.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package org.apache.jetspeed.services.messaging.jms.activemq;
  
  import javax.jms.DeliveryMode;
  import javax.jms.JMSException;
  import javax.jms.Session;
  
  import org.apache.jetspeed.services.messaging.jms.AbstractJMSMessagingService;
  import org.codehaus.activemq.ActiveMQConnection;
  import org.codehaus.activemq.ActiveMQConnectionFactory;
  
  /**
   * @author <a href="mailto:jford@apache.org">Jeremy Ford</a>
   * @version $Id: MessagingServiceImpl.java,v 1.1 2004/07/23 00:24:01 jford Exp $
   */
  public class MessagingServiceImpl extends AbstractJMSMessagingService
  {
      protected static final String DEFAULT_FACTORY = "JmsTopicConnectionFactory";
      
      protected static final String CONFIG_USER = "user";
      protected static final String CONFIG_PWD = "pwd";
      
      protected static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;
      protected static final String DEFAULT_PWD = ActiveMQConnection.DEFAULT_PASSWORD;
      
      private String user;
      private String pwd;
  
      
      public MessagingServiceImpl()
      {
          this.DEFAULT_URL = ActiveMQConnection.DEFAULT_URL;
          this.DEFAULT_ACK_MODE = Session.AUTO_ACKNOWLEDGE;
          this.DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
          this.DEFAULT_DURABLE = false;
          this.DEFAULT_NOLOCAL = true;
          this.DEFAULT_TRANSACTED = false;
      }
      
      protected void initJMSConfiguration()
      {
          super.initJMSConfiguration();
          
          user = getConfiguration().getString(CONFIG_USER, DEFAULT_USER);
          pwd = getConfiguration().getString(CONFIG_PWD, DEFAULT_PWD);
      }
          
      protected void createConnection() throws JMSException {
          String providerURL = getProviderURL();
          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, providerURL);
          connection = connectionFactory.createConnection();
      }
      
      protected void createSession() throws JMSException {
          session = connection.createSession(transacted, ackMode);
      }
  }
  
  
  
  1.1                  jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/jms/AbstractJMSMessagingService.java
  
  Index: AbstractJMSMessagingService.java
  ===================================================================
  /*
   * Copyright 2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package org.apache.jetspeed.services.messaging.jms;
  
  import java.util.Hashtable;
  import java.util.Iterator;
  
  import javax.jms.Connection;
  import javax.jms.DeliveryMode;
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.MessageConsumer;
  import javax.jms.MessageListener;
  import javax.jms.MessageProducer;
  import javax.jms.Session;
  import javax.jms.Topic;
  import javax.naming.Context;
  import javax.naming.InitialContext;
  import javax.naming.NamingException;
  import javax.servlet.ServletConfig;
  
  import org.apache.jetspeed.services.messaging.MessagingService;
  import org.apache.turbine.services.InitializationException;
  import org.apache.turbine.services.TurbineBaseService;
  
  /**
   * @author <a href="mailto:jford@apache.org">Jeremy Ford</a>
   * @version $Id: AbstractJMSMessagingService.java,v 1.1 2004/07/23 00:24:01 jford Exp $
   */
  public abstract class AbstractJMSMessagingService extends TurbineBaseService
  	implements MessagingService
  {
      protected static final String CONFIG_SCHEME = "scheme";
      protected static final String CONFIG_HOST = "host";
      protected static final String CONFIG_PORT = "port";
      protected static final String CONFIG_NAME = "name";
      protected static final String CONFIG_URL = "url";
      
      protected static final String CONFIG_CONNECTION_FACTORY = "connection_factory";
      
      protected static final String CONFIG_TRANSACTED = "transacted";
      protected static final String CONFIG_ACK_MODE = "ack_mode";
      protected static final String CONFIG_TOPIC = "topic_enabled";
      protected static final String CONFIG_DURABLE = "durable";
      protected static final String CONFIG_NOLOCAL = "nolocal";
      
      //producer properties
      protected static final String CONFIG_DELIVERY_MODE = "delivery_mode";
      protected static final String CONFIG_PRIORITY = "priority";
      protected static final String CONFIG_TIME_TO_LIVE = "time_to_live";
  
      protected String DEFAULT_SCHEME;
      protected String DEFAULT_HOST;
      protected String DEFAULT_PORT;
      protected String DEFAULT_NAME = "";
      protected String DEFAULT_URL;
      
      protected String DEFAULT_CONNECTION_FACTORY;
      
      protected boolean DEFAULT_TRANSACTED = false;
      protected int DEFAULT_ACK_MODE  = Session.AUTO_ACKNOWLEDGE;
      protected boolean DEFAULT_TOPIC = true;
      protected boolean DEFAULT_DURABLE = false;
      protected boolean DEFAULT_NOLOCAL = true;
      
      protected int DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
      protected int DEFAULT_PRIORITY = 0; //0<x<10
      protected int DEFAULT_TIME_TO_LIVE = 0;
      
      //Acknowledgement keys
      protected static final String AUTO_ACKNOWLEDGE = "auto";
      protected static final String CLIENT_ACKNOWLEDGE = "client";
      protected static final String DUPS_OK_ACKNOWLEDGE = "dups_ok";
      
      //Delivery Mode Key
      protected static final String PERSISTENT = "persistent";
      
      //url properties
      protected String scheme;
      protected String host;
      protected String port;
      protected String name;
      protected String url;
      
      //connection factory properties
      protected String connectionFactory;
      protected String initialContextFactory;
      
      //default producer properties
      protected int deliveryMode;
      protected int priority;
      protected int timeToLive;
  
      //message properties
      protected boolean transacted = false;
      protected int ackMode;
      
      protected boolean use_topic;
      protected boolean durable;
      protected boolean nolocal;
      
      protected Hashtable destinations = new Hashtable();
      protected Hashtable consumers = new Hashtable();
      protected Hashtable producers = new Hashtable();
      
      protected Connection connection;
      protected Session session;
      
      /**
       * This is the early initialization method called by the
       * Turbine <code>Service</code> framework
       */
      public synchronized void init(ServletConfig conf) throws InitializationException
      {
          try
          {
              initJMSConfiguration();
              
              createConnection();
              
              connection.start();
              
              createSession();
          }
          catch(JMSException e)
          {
              throw new InitializationException(e.getMessage(), e);
          }
          catch(Exception e)
          {
              throw new InitializationException(e.getMessage(), e);
          }
      }
      
      protected void initJMSConfiguration()
      {
          scheme = getConfiguration().getString(CONFIG_SCHEME, DEFAULT_SCHEME);
          host = getConfiguration().getString(CONFIG_HOST, DEFAULT_HOST);
          port = getConfiguration().getString(CONFIG_PORT, DEFAULT_PORT);
          name = getConfiguration().getString(CONFIG_NAME, DEFAULT_NAME);
          url = getConfiguration().getString(CONFIG_URL, DEFAULT_URL);
          
          connectionFactory = getConfiguration().getString(CONFIG_CONNECTION_FACTORY, DEFAULT_CONNECTION_FACTORY);
          
          transacted = getConfiguration().getBoolean(CONFIG_TRANSACTED, DEFAULT_TRANSACTED);
          
          use_topic = getConfiguration().getBoolean(CONFIG_TOPIC, DEFAULT_TOPIC);
          durable = getConfiguration().getBoolean(CONFIG_DURABLE, DEFAULT_DURABLE);
          nolocal = getConfiguration().getBoolean(CONFIG_NOLOCAL, DEFAULT_NOLOCAL);
          
          ackMode = DEFAULT_ACK_MODE;
          String ackModeStr = getConfiguration().getString(CONFIG_ACK_MODE);
          if(ackModeStr != null)
          {
              if(ackModeStr.equals(AUTO_ACKNOWLEDGE))
              {
                  ackMode = Session.AUTO_ACKNOWLEDGE;
              }
              else if(ackModeStr.equals(CLIENT_ACKNOWLEDGE))
              {
                  ackMode = Session.CLIENT_ACKNOWLEDGE;
              }
              else if(ackModeStr.equals(DUPS_OK_ACKNOWLEDGE))
              {
                  ackMode = Session.DUPS_OK_ACKNOWLEDGE;
              }
          }
          
          //producer related config
          deliveryMode = DEFAULT_DELIVERY_MODE;
          String deliveryModeStr = getConfiguration().getString(CONFIG_DELIVERY_MODE);
          if(deliveryModeStr != null)
          {
  	        if(deliveryModeStr.equals(PERSISTENT))
  	        {
  	            deliveryMode = DeliveryMode.PERSISTENT;
  	        }
  	        else
  	        {
  	            deliveryMode = DeliveryMode.NON_PERSISTENT;
  	        }
          }
          priority = getConfiguration().getInt(CONFIG_PRIORITY, DEFAULT_PRIORITY);
          timeToLive = getConfiguration().getInt(CONFIG_TIME_TO_LIVE, DEFAULT_TIME_TO_LIVE);
      }
      
      protected String getProviderURL()
      {
          String providerURL = url;
          
          if(providerURL == null)
          {
              if((scheme != null) && (host != null) && (port != null) && (name != null))
              {
                  providerURL = scheme + "://" + host + ":" + port + "/" + name;
              }
          }
          return providerURL;
      }
      
      
      protected Context getContext() throws NamingException
      {
          Hashtable properties = new Hashtable();
      	
          properties.put(
              Context.INITIAL_CONTEXT_FACTORY, 
              initialContextFactory);
      
          String providerURL = getProviderURL();
          properties.put(Context.PROVIDER_URL, providerURL);
          
          return new InitialContext(properties);
      }
      
      protected void createSession() throws JMSException
      {
          session = connection.createSession(transacted, ackMode);
      }
      
      protected abstract void createConnection() throws JMSException;
      
      
      /* (non-Javadoc)
       * @see org.apache.jetspeed.services.messaging.MessagingService#createMessage(int)
       */
      public Message createMessage(int messageType)
      {
          Message message = null;
          
          try
          {
  	        if(messageType == TEXT_MESSAGE)
  	        {
  	            message = session.createTextMessage();
  	        }
  	        else if(messageType == BYTE_MESSAGE)
  	        {
  	            message = session.createBytesMessage();
  	        }
  	        else if(messageType == MAP_MESSAGE)
  	        {
  	            message = session.createMapMessage();
  	        }
  	        else if(messageType == OBJECT_MESSAGE)
  	        {
  	            message = session.createObjectMessage();
  	        }
  	        else if(messageType == STREAM_MESSAGE)
  	        {
  	            message = session.createStreamMessage();
  	        }
  	        else
  	        {
  	            message = session.createMessage();
  	        }
          }
          catch(JMSException e)
          {
              e.printStackTrace();
          }
          
          return message;
      }
      
      /**
       * This is the shutdown method called by the
       * Turbine <code>Service</code> framework
       */
      public void shutdown()
      {
          Iterator keyIter = consumers.keySet().iterator();
          while (keyIter.hasNext())
          {
              String id = (String) keyIter.next();
              MessageConsumer consumer = (MessageConsumer)consumers.get(id);
              try
              {
                  System.out.println("Closing consumer for uniqueid " + id);
                  consumer.close();
              }
              catch (JMSException e2)
              {
                  // TODO Auto-generated catch block
                  e2.printStackTrace();
              }
          }
          
          Iterator producerKeyIter = producers.keySet().iterator();
          while (producerKeyIter.hasNext())
          {
              String id = (String) producerKeyIter.next();
              MessageProducer producer = (MessageProducer)producers.get(id);
              try
              {
                  System.out.println("Closing producer for destinaton " + id);
                  producer.close();
              }
              catch (JMSException e2)
              {
                  // TODO Auto-generated catch block
                  e2.printStackTrace();
              }
          }
          
          try
          {
              System.out.println("closing session");
              session.close();
          }
          catch (JMSException e)
          {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
          try
          {
              System.out.println("closing connection");
              connection.stop();
              connection.close();
          }
          catch (JMSException e1)
          {
              // TODO Auto-generated catch block
              e1.printStackTrace();
          }
      }
      
      /* (non-Javadoc)
       * @see org.apache.jetspeed.services.messaging.MessagingService#addMessageListener(java.lang.Object)
       */
      public void addMessageListener(MessageListener listener, String id, String destination)
      {
          try
          {
              Destination consumerDestination = getDestination(destination);
              MessageConsumer consumer = (MessageConsumer) consumers.get(id);
              if(consumer == null)
              {
  	            if(this.use_topic && this.durable)
  	            {
  	                if(durable)
  	                {
  	                    consumer = session.createDurableSubscriber((Topic)consumerDestination, id, "", nolocal); //topic, uniqueID, selector, nolocal
  	                }
  	            }
  	            else
  	            {
  	                consumer = session.createConsumer(consumerDestination, "", nolocal);
  	            }
              }
              consumer.setMessageListener(listener);
              consumers.put(id, consumer);
          }
          catch (JMSException e)
          {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
      }
      
      public MessageConsumer createConsumer(String destination)
      {
          MessageConsumer consumer = null;
          try
          {
              Destination consumerDestination = getDestination(destination);
              consumer = session.createConsumer(consumerDestination);
          }
          catch (JMSException e)
          {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
          
          return consumer;
      }
      
      public void removeMessageListener(String id)
      {
          MessageConsumer consumer = (MessageConsumer)consumers.get(id);
          
          try
          {
              if(durable && use_topic)
              {
                  session.unsubscribe(id);
              }
              consumer.close();
          } catch (JMSException e)
          {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
          consumers.remove(id);
      }
      
  
      /* (non-Javadoc)
       * @see org.apache.jetspeed.services.messaging.MessagingService#sendMessage(java.lang.Object)
       */
      public void sendMessage(Message message, String destination)
      {
          try
          {
              Destination producerDestination = getDestination(destination);
              MessageProducer producer = getProducer(destination);
              producer.send(producerDestination, message);//, deliveryMode, priority, 0);
          }
          catch (JMSException e)
          {
              // TODO Auto-generated catch block
              e.printStackTrace();
          }
      }
      
      protected Destination getDestination(String destination)
      {
          Destination jmsDestination = (Destination) destinations.get(destination);
          
          if(jmsDestination == null)
          {
  	        try
  	        {
  		        if (use_topic) {
  		            jmsDestination = session.createTopic(destination);
  				}
  				else {
  				    jmsDestination = session.createQueue(destination);
  				}
  		        
  		        destinations.put(destination, jmsDestination);
  	        }
  	        catch (JMSException e)
  	        {
  	            e.printStackTrace();
  	        }
          }
          
          return jmsDestination;
      }
      
      protected MessageProducer getProducer(String destination)
      {
          Destination producerDestination = (Destination) destinations.get(destination);
          MessageProducer producer = (MessageProducer) producers.get(destination);
          if(producer == null)
          {
              try
              {
                  producer = session.createProducer(producerDestination);
                  producer.setDeliveryMode(this.deliveryMode);
                  producer.setTimeToLive(this.timeToLive);
                  producer.setPriority(this.priority);
              }
              catch (JMSException e)
              {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }   
          }
          
          return producer;
      }
  
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: jetspeed-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jetspeed-dev-help@jakarta.apache.org