You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jetspeed-dev@portals.apache.org by jf...@apache.org on 2004/07/23 02:24:01 UTC
cvs commit: jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/jms AbstractJMSMessagingService.java
jford 2004/07/22 17:24:01
Added: src/java/org/apache/jetspeed/services/messaging
MessagingService.java
src/java/org/apache/jetspeed/services/messaging/jms/activemq
MessagingServiceImpl.java
src/java/org/apache/jetspeed/services/messaging/jms
AbstractJMSMessagingService.java
Log:
MessagingService implemenation
Revision Changes Path
1.1 jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/MessagingService.java
Index: MessagingService.java
===================================================================
/*
* Copyright 2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jetspeed.services.messaging;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
/**
* @author <a href="mailto:jford@apache.org">Jeremy Ford</a>
* @version $Id: MessagingService.java,v 1.1 2004/07/23 00:24:01 jford Exp $
*/
public interface MessagingService
{
public static final String SERVICE_NAME = "MessagingService";
public static final String USER_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.user.update";
public static final String GROUP_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.group.update";
public static final String ROLE_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.role.update";
public static final String SECURITY_UPDATE_SUBJECT = "org.apache.jetspeed.services.messaging.security.update";
public static final int MESSAGE = 0;
public static final int TEXT_MESSAGE = 1;
public static final int BYTE_MESSAGE = 2;
public static final int MAP_MESSAGE = 3;
public static final int OBJECT_MESSAGE = 4;
public static final int STREAM_MESSAGE = 5;
public static final String REGISTRY_UPDATE_SUBJECT = "org.apache.jetspeed.services.registry.update";
/**
* This method creates a new consumer if one with the given id has not been created.
* The consumer will be closed during the services shutdown.
*
* @param listener
* @param id
* @param destination
*/
public void addMessageListener(MessageListener listener, String id, String destination);
/**
* This method creates a new MessageConsumer.
* It's lifecycle should be managed by the caller.
*
* @param listener
* @param destination
*/
public MessageConsumer createConsumer(String destination);
public void removeMessageListener(String id);
public void sendMessage(Message message, String destination);
public Message createMessage(int messageType);
}
1.1 jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/jms/activemq/MessagingServiceImpl.java
Index: MessagingServiceImpl.java
===================================================================
/*
* Copyright 2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jetspeed.services.messaging.jms.activemq;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.jetspeed.services.messaging.jms.AbstractJMSMessagingService;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQConnectionFactory;
/**
* @author <a href="mailto:jford@apache.org">Jeremy Ford</a>
* @version $Id: MessagingServiceImpl.java,v 1.1 2004/07/23 00:24:01 jford Exp $
*/
public class MessagingServiceImpl extends AbstractJMSMessagingService
{
protected static final String DEFAULT_FACTORY = "JmsTopicConnectionFactory";
protected static final String CONFIG_USER = "user";
protected static final String CONFIG_PWD = "pwd";
protected static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;
protected static final String DEFAULT_PWD = ActiveMQConnection.DEFAULT_PASSWORD;
private String user;
private String pwd;
public MessagingServiceImpl()
{
this.DEFAULT_URL = ActiveMQConnection.DEFAULT_URL;
this.DEFAULT_ACK_MODE = Session.AUTO_ACKNOWLEDGE;
this.DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
this.DEFAULT_DURABLE = false;
this.DEFAULT_NOLOCAL = true;
this.DEFAULT_TRANSACTED = false;
}
protected void initJMSConfiguration()
{
super.initJMSConfiguration();
user = getConfiguration().getString(CONFIG_USER, DEFAULT_USER);
pwd = getConfiguration().getString(CONFIG_PWD, DEFAULT_PWD);
}
protected void createConnection() throws JMSException {
String providerURL = getProviderURL();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, providerURL);
connection = connectionFactory.createConnection();
}
protected void createSession() throws JMSException {
session = connection.createSession(transacted, ackMode);
}
}
1.1 jakarta-jetspeed/src/java/org/apache/jetspeed/services/messaging/jms/AbstractJMSMessagingService.java
Index: AbstractJMSMessagingService.java
===================================================================
/*
* Copyright 2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jetspeed.services.messaging.jms;
import java.util.Hashtable;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.servlet.ServletConfig;
import org.apache.jetspeed.services.messaging.MessagingService;
import org.apache.turbine.services.InitializationException;
import org.apache.turbine.services.TurbineBaseService;
/**
* @author <a href="mailto:jford@apache.org">Jeremy Ford</a>
* @version $Id: AbstractJMSMessagingService.java,v 1.1 2004/07/23 00:24:01 jford Exp $
*/
public abstract class AbstractJMSMessagingService extends TurbineBaseService
implements MessagingService
{
protected static final String CONFIG_SCHEME = "scheme";
protected static final String CONFIG_HOST = "host";
protected static final String CONFIG_PORT = "port";
protected static final String CONFIG_NAME = "name";
protected static final String CONFIG_URL = "url";
protected static final String CONFIG_CONNECTION_FACTORY = "connection_factory";
protected static final String CONFIG_TRANSACTED = "transacted";
protected static final String CONFIG_ACK_MODE = "ack_mode";
protected static final String CONFIG_TOPIC = "topic_enabled";
protected static final String CONFIG_DURABLE = "durable";
protected static final String CONFIG_NOLOCAL = "nolocal";
//producer properties
protected static final String CONFIG_DELIVERY_MODE = "delivery_mode";
protected static final String CONFIG_PRIORITY = "priority";
protected static final String CONFIG_TIME_TO_LIVE = "time_to_live";
protected String DEFAULT_SCHEME;
protected String DEFAULT_HOST;
protected String DEFAULT_PORT;
protected String DEFAULT_NAME = "";
protected String DEFAULT_URL;
protected String DEFAULT_CONNECTION_FACTORY;
protected boolean DEFAULT_TRANSACTED = false;
protected int DEFAULT_ACK_MODE = Session.AUTO_ACKNOWLEDGE;
protected boolean DEFAULT_TOPIC = true;
protected boolean DEFAULT_DURABLE = false;
protected boolean DEFAULT_NOLOCAL = true;
protected int DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
protected int DEFAULT_PRIORITY = 0; //0<x<10
protected int DEFAULT_TIME_TO_LIVE = 0;
//Acknowledgement keys
protected static final String AUTO_ACKNOWLEDGE = "auto";
protected static final String CLIENT_ACKNOWLEDGE = "client";
protected static final String DUPS_OK_ACKNOWLEDGE = "dups_ok";
//Delivery Mode Key
protected static final String PERSISTENT = "persistent";
//url properties
protected String scheme;
protected String host;
protected String port;
protected String name;
protected String url;
//connection factory properties
protected String connectionFactory;
protected String initialContextFactory;
//default producer properties
protected int deliveryMode;
protected int priority;
protected int timeToLive;
//message properties
protected boolean transacted = false;
protected int ackMode;
protected boolean use_topic;
protected boolean durable;
protected boolean nolocal;
protected Hashtable destinations = new Hashtable();
protected Hashtable consumers = new Hashtable();
protected Hashtable producers = new Hashtable();
protected Connection connection;
protected Session session;
/**
* This is the early initialization method called by the
* Turbine <code>Service</code> framework
*/
public synchronized void init(ServletConfig conf) throws InitializationException
{
try
{
initJMSConfiguration();
createConnection();
connection.start();
createSession();
}
catch(JMSException e)
{
throw new InitializationException(e.getMessage(), e);
}
catch(Exception e)
{
throw new InitializationException(e.getMessage(), e);
}
}
protected void initJMSConfiguration()
{
scheme = getConfiguration().getString(CONFIG_SCHEME, DEFAULT_SCHEME);
host = getConfiguration().getString(CONFIG_HOST, DEFAULT_HOST);
port = getConfiguration().getString(CONFIG_PORT, DEFAULT_PORT);
name = getConfiguration().getString(CONFIG_NAME, DEFAULT_NAME);
url = getConfiguration().getString(CONFIG_URL, DEFAULT_URL);
connectionFactory = getConfiguration().getString(CONFIG_CONNECTION_FACTORY, DEFAULT_CONNECTION_FACTORY);
transacted = getConfiguration().getBoolean(CONFIG_TRANSACTED, DEFAULT_TRANSACTED);
use_topic = getConfiguration().getBoolean(CONFIG_TOPIC, DEFAULT_TOPIC);
durable = getConfiguration().getBoolean(CONFIG_DURABLE, DEFAULT_DURABLE);
nolocal = getConfiguration().getBoolean(CONFIG_NOLOCAL, DEFAULT_NOLOCAL);
ackMode = DEFAULT_ACK_MODE;
String ackModeStr = getConfiguration().getString(CONFIG_ACK_MODE);
if(ackModeStr != null)
{
if(ackModeStr.equals(AUTO_ACKNOWLEDGE))
{
ackMode = Session.AUTO_ACKNOWLEDGE;
}
else if(ackModeStr.equals(CLIENT_ACKNOWLEDGE))
{
ackMode = Session.CLIENT_ACKNOWLEDGE;
}
else if(ackModeStr.equals(DUPS_OK_ACKNOWLEDGE))
{
ackMode = Session.DUPS_OK_ACKNOWLEDGE;
}
}
//producer related config
deliveryMode = DEFAULT_DELIVERY_MODE;
String deliveryModeStr = getConfiguration().getString(CONFIG_DELIVERY_MODE);
if(deliveryModeStr != null)
{
if(deliveryModeStr.equals(PERSISTENT))
{
deliveryMode = DeliveryMode.PERSISTENT;
}
else
{
deliveryMode = DeliveryMode.NON_PERSISTENT;
}
}
priority = getConfiguration().getInt(CONFIG_PRIORITY, DEFAULT_PRIORITY);
timeToLive = getConfiguration().getInt(CONFIG_TIME_TO_LIVE, DEFAULT_TIME_TO_LIVE);
}
protected String getProviderURL()
{
String providerURL = url;
if(providerURL == null)
{
if((scheme != null) && (host != null) && (port != null) && (name != null))
{
providerURL = scheme + "://" + host + ":" + port + "/" + name;
}
}
return providerURL;
}
protected Context getContext() throws NamingException
{
Hashtable properties = new Hashtable();
properties.put(
Context.INITIAL_CONTEXT_FACTORY,
initialContextFactory);
String providerURL = getProviderURL();
properties.put(Context.PROVIDER_URL, providerURL);
return new InitialContext(properties);
}
protected void createSession() throws JMSException
{
session = connection.createSession(transacted, ackMode);
}
protected abstract void createConnection() throws JMSException;
/* (non-Javadoc)
* @see org.apache.jetspeed.services.messaging.MessagingService#createMessage(int)
*/
public Message createMessage(int messageType)
{
Message message = null;
try
{
if(messageType == TEXT_MESSAGE)
{
message = session.createTextMessage();
}
else if(messageType == BYTE_MESSAGE)
{
message = session.createBytesMessage();
}
else if(messageType == MAP_MESSAGE)
{
message = session.createMapMessage();
}
else if(messageType == OBJECT_MESSAGE)
{
message = session.createObjectMessage();
}
else if(messageType == STREAM_MESSAGE)
{
message = session.createStreamMessage();
}
else
{
message = session.createMessage();
}
}
catch(JMSException e)
{
e.printStackTrace();
}
return message;
}
/**
* This is the shutdown method called by the
* Turbine <code>Service</code> framework
*/
public void shutdown()
{
Iterator keyIter = consumers.keySet().iterator();
while (keyIter.hasNext())
{
String id = (String) keyIter.next();
MessageConsumer consumer = (MessageConsumer)consumers.get(id);
try
{
System.out.println("Closing consumer for uniqueid " + id);
consumer.close();
}
catch (JMSException e2)
{
// TODO Auto-generated catch block
e2.printStackTrace();
}
}
Iterator producerKeyIter = producers.keySet().iterator();
while (producerKeyIter.hasNext())
{
String id = (String) producerKeyIter.next();
MessageProducer producer = (MessageProducer)producers.get(id);
try
{
System.out.println("Closing producer for destinaton " + id);
producer.close();
}
catch (JMSException e2)
{
// TODO Auto-generated catch block
e2.printStackTrace();
}
}
try
{
System.out.println("closing session");
session.close();
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
try
{
System.out.println("closing connection");
connection.stop();
connection.close();
}
catch (JMSException e1)
{
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
/* (non-Javadoc)
* @see org.apache.jetspeed.services.messaging.MessagingService#addMessageListener(java.lang.Object)
*/
public void addMessageListener(MessageListener listener, String id, String destination)
{
try
{
Destination consumerDestination = getDestination(destination);
MessageConsumer consumer = (MessageConsumer) consumers.get(id);
if(consumer == null)
{
if(this.use_topic && this.durable)
{
if(durable)
{
consumer = session.createDurableSubscriber((Topic)consumerDestination, id, "", nolocal); //topic, uniqueID, selector, nolocal
}
}
else
{
consumer = session.createConsumer(consumerDestination, "", nolocal);
}
}
consumer.setMessageListener(listener);
consumers.put(id, consumer);
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public MessageConsumer createConsumer(String destination)
{
MessageConsumer consumer = null;
try
{
Destination consumerDestination = getDestination(destination);
consumer = session.createConsumer(consumerDestination);
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
return consumer;
}
public void removeMessageListener(String id)
{
MessageConsumer consumer = (MessageConsumer)consumers.get(id);
try
{
if(durable && use_topic)
{
session.unsubscribe(id);
}
consumer.close();
} catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
consumers.remove(id);
}
/* (non-Javadoc)
* @see org.apache.jetspeed.services.messaging.MessagingService#sendMessage(java.lang.Object)
*/
public void sendMessage(Message message, String destination)
{
try
{
Destination producerDestination = getDestination(destination);
MessageProducer producer = getProducer(destination);
producer.send(producerDestination, message);//, deliveryMode, priority, 0);
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
protected Destination getDestination(String destination)
{
Destination jmsDestination = (Destination) destinations.get(destination);
if(jmsDestination == null)
{
try
{
if (use_topic) {
jmsDestination = session.createTopic(destination);
}
else {
jmsDestination = session.createQueue(destination);
}
destinations.put(destination, jmsDestination);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
return jmsDestination;
}
protected MessageProducer getProducer(String destination)
{
Destination producerDestination = (Destination) destinations.get(destination);
MessageProducer producer = (MessageProducer) producers.get(destination);
if(producer == null)
{
try
{
producer = session.createProducer(producerDestination);
producer.setDeliveryMode(this.deliveryMode);
producer.setTimeToLive(this.timeToLive);
producer.setPriority(this.priority);
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return producer;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: jetspeed-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jetspeed-dev-help@jakarta.apache.org