You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/12/08 19:15:41 UTC
svn commit: r724432 [4/4] - in
/webservices/commons/trunk/modules/transport/modules:
base/src/main/java/org/apache/axis2/transport/base/ jms/
jms/src/main/java/org/apache/axis2/transport/jms/
testkit/src/main/java/org/apache/axis2/transport/testkit/tes...
Added: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=724432&view=auto
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (added)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Mon Dec 8 10:15:40 2008
@@ -0,0 +1,1205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.transport.base.BaseConstants;
+import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.InitialContext;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.transaction.UserTransaction;
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
+import java.util.*;
+
+/**
+ * Each service will have one ServiceTaskManager instance that will create, manage and also destroy
+ * idle tasks created for it, for message receipt. This will also allow individual tasks to cache
+ * the Connection, Session or Consumer as necessary, considering the transactionality required and
+ * user preference.
+ *
+ * This also acts as the ExceptionListener for all JMS connections made on behalf of the service.
+ * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try
+ * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh
+ * for the service, by discarding all connections.
+ */
+public class ServiceTaskManager {
+
+ /** The logger */
+ private static final Log log = LogFactory.getLog(ServiceTaskManager.class);
+
+ /** The Task manager is stopped or has not started */
+ private static final int STATE_STOPPED = 0;
+ /** The Task manager is started and active */
+ private static final int STATE_STARTED = 1;
+ /** The Task manager is paused temporarily */
+ private static final int STATE_PAUSED = 2;
+ /** The Task manager is started, but a shutdown has been requested */
+ private static final int STATE_SHUTTING_DOWN = 3;
+ /** The Task manager has encountered an error */
+ private static final int STATE_FAILURE = 4;
+
+ /** The name of the service managed by this instance */
+ private String serviceName;
+ /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */
+ private String connFactoryJNDIName;
+ /** The JNDI name of the Destination Queue or Topic */
+ private String destinationJNDIName;
+ /** JNDI location for the JTA UserTransaction */
+ private String userTransactionJNDIName = "java:comp/UserTransaction";
+ /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */
+ private int destinationType = JMSConstants.GENERIC;
+ /** An optional message selector */
+ private String messageSelector = null;
+
+ /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */
+ private int transactionality = BaseConstants.TRANSACTION_NONE;
+ /** Should created Sessions be transactional ? - should be false when using JTA */
+ private boolean sessionTransacted = true;
+ /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */
+ private int sessionAckMode = Session.AUTO_ACKNOWLEDGE;
+
+ /** Is the subscription durable ? */
+ private boolean subscriptionDurable = false;
+ /** The name of the durable subscriber for this client */
+ private String durableSubscriberName = null;
+ /** In PubSub mode, should I receive messages sent by me / my connection ? */
+ private boolean pubSubNoLocal = false;
+ /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */
+ private int concurrentConsumers = 1;
+ /** Maximum number of consumers to create - see @concurrentConsumers */
+ private int maxConcurrentConsumers = 1;
+ /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */
+ private int idleTaskExecutionLimit = 10;
+ /** The maximum number of successful message receipts for a task - to limit thread life span */
+ private int maxMessagesPerTask = -1; // default is unlimited
+ /** The default receive timeout - a negative value means wait forever, zero dont wait at all */
+ private int receiveTimeout = 1000;
+ /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */
+ private int cacheLevel = JMSConstants.CACHE_AUTO;
+ /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */
+ private boolean cacheUserTransaction = true;
+ /** Shared UserTransactionHandle */
+ private UserTransaction sharedUserTransaction = null;
+ /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */
+ private boolean jmsSpec11 = true;
+
+ /** Initial duration to attempt re-connection to JMS provider after failure */
+ private int initialReconnectDuration = 10000;
+ /** Progression factory for geometric series that calculates re-connection times */
+ private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential
+ /** Upper limit on reconnection attempt duration */
+ private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour
+
+ /** The JNDI context properties and other general properties */
+ private Hashtable<String,String> jmsProperties = new Hashtable<String, String>();
+ /** The JNDI Context acuired */
+ private Context context = null;
+ /** The ConnectionFactory to be used */
+ private ConnectionFactory conFactory = null;
+ /** The JMS Destination */
+ private Destination destination = null;
+
+ /** The list of active tasks thats managed by this instance */
+ private final List<MessageListenerTask> pollingTasks =
+ Collections.synchronizedList(new ArrayList<MessageListenerTask>());
+ /** The per-service JMS message receiver to be invoked after receipt of messages */
+ private JMSMessageReceiver jmsMessageReceiver = null;
+
+ /** State of this Task Manager */
+ private volatile int serviceTaskManagerState = STATE_STOPPED;
+ /** Number of invoker tasks active */
+ private volatile int activeTaskCount = 0;
+ /** The shared thread pool from the Listener */
+ private WorkerPool workerPool = null;
+
+ /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */
+ private Connection sharedConnection = null;
+
+ /**
+ * Start or re-start the Task Manager by shutting down any existing worker tasks and
+ * re-creating them. However, if this is STM is PAUSED, a start request is ignored.
+ * This applies for any connection failures during paused state as well, which then will
+ * not try to auto recover
+ */
+ public synchronized void start() {
+
+ if (serviceTaskManagerState == STATE_PAUSED) {
+ log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead");
+ return;
+ }
+
+ // if any tasks are running, stop whats running now
+ if (!pollingTasks.isEmpty()) {
+ stop();
+ }
+
+ if (cacheLevel == JMSConstants.CACHE_AUTO) {
+ cacheLevel =
+ transactionality == BaseConstants.TRANSACTION_NONE ?
+ JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE;
+ }
+ switch (cacheLevel) {
+ case JMSConstants.CACHE_NONE:
+ log.debug("No JMS resources will be cached/shared between poller " +
+ "worker tasks of service : " + serviceName);
+ break;
+ case JMSConstants.CACHE_CONNECTION:
+ log.debug("Only the JMS Connection will be cached and shared between *all* " +
+ "poller task invocations");
+ break;
+ case JMSConstants.CACHE_SESSION:
+ log.debug("The JMS Connection and Session will be cached and shared between " +
+ "successive poller task invocations");
+ break;
+ case JMSConstants.CACHE_CONSUMER:
+ log.debug("The JMS Connection, Session and MessageConsumer will be cached and " +
+ "shared between successive poller task invocations");
+ break;
+ default : {
+ handleException("Invalid cache level : " + cacheLevel +
+ " for service : " + serviceName);
+ }
+ }
+
+ for (int i=0; i<concurrentConsumers; i++) {
+ workerPool.execute(new MessageListenerTask());
+ }
+
+ serviceTaskManagerState = STATE_STARTED;
+ log.info("Task manager for service : " + serviceName + " [re-]initialized");
+ }
+
+ /**
+ * Shutdown the tasks and release any shared resources
+ */
+ public synchronized void stop() {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping ServiceTaskManager for service : " + serviceName);
+ }
+
+ if (serviceTaskManagerState != STATE_FAILURE) {
+ serviceTaskManagerState = STATE_SHUTTING_DOWN;
+ }
+
+ synchronized(pollingTasks) {
+ for (MessageListenerTask lstTask : pollingTasks) {
+ lstTask.requestShutdown();
+ }
+ }
+
+ // try to wait a bit for task shutdown
+ for (int i=0; i<5; i++) {
+ if (activeTaskCount == 0) {
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {}
+ }
+
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.stop();
+ } catch (JMSException e) {
+ logError("Error stopping shared Connection", e);
+ } finally {
+ sharedConnection = null;
+ }
+ }
+
+ if (activeTaskCount > 0) {
+ log.warn("Unable to shutdown all polling tasks of service : " + serviceName);
+ }
+
+ if (serviceTaskManagerState != STATE_FAILURE) {
+ serviceTaskManagerState = STATE_STOPPED;
+ }
+ log.info("Task manager for service : " + serviceName + " shutdown");
+ }
+
+ /**
+ * Temporarily suspend receipt and processing of messages. Accomplished by stopping the
+ * connection / or connections used by the poller tasks
+ */
+ public synchronized void pause() {
+ for (MessageListenerTask lstTask : pollingTasks) {
+ lstTask.pause();
+ }
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.stop();
+ } catch (JMSException e) {
+ logError("Error pausing shared Connection", e);
+ }
+ }
+ }
+
+ /**
+ * Resume receipt and processing of messages of paused tasks
+ */
+ public synchronized void resume() {
+ for (MessageListenerTask lstTask : pollingTasks) {
+ lstTask.resume();
+ }
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.start();
+ } catch (JMSException e) {
+ logError("Error resuming shared Connection", e);
+ }
+ }
+ }
+
+ /**
+ * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w
+ * e do not have any idle tasks - i.e. scale up listening
+ */
+ private void scheduleNewTaskIfAppropriate() {
+ if (serviceTaskManagerState == STATE_STARTED &&
+ pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) {
+ workerPool.execute(new MessageListenerTask());
+ }
+ }
+
+ /**
+ * Get the number of MessageListenerTasks that are currently idle
+ * @return idle task count
+ */
+ private int getIdleTaskCount() {
+ int count = 0;
+ for (MessageListenerTask lstTask : pollingTasks) {
+ if (lstTask.isTaskIdle()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Get the number of MessageListenerTasks that are currently connected to the JMS provider
+ * @return connected task count
+ */
+ private int getConnectedTaskCount() {
+ int count = 0;
+ for (MessageListenerTask lstTask : pollingTasks) {
+ if (lstTask.isConnected()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * The actual threads/tasks that perform message polling
+ */
+ private class MessageListenerTask implements Runnable, ExceptionListener {
+
+ /** The Connection used by the polling task */
+ private Connection connection = null;
+ /** The Sesson used by the polling task */
+ private Session session = null;
+ /** The MessageConsumer used by the polling task */
+ private MessageConsumer consumer = null;
+ /** State of the worker polling task */
+ private volatile int workerState = STATE_STOPPED;
+ /** The number of idle (i.e. without fetching a message) polls for this task */
+ private int idleExecutionCount = 0;
+ /** Is this task idle right now? */
+ private volatile boolean idle = false;
+ /** Is this task connected to the JMS provider successfully? */
+ private boolean connected = false;
+
+ /** As soon as we create a new polling task, add it to the STM for control later */
+ MessageListenerTask() {
+ synchronized(pollingTasks) {
+ pollingTasks.add(this);
+ }
+ }
+
+ /**
+ * Pause this polling worker task
+ */
+ public void pause() {
+ if (isActive()) {
+ if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ connection.stop();
+ } catch (JMSException e) {
+ log.warn("Error pausing Message Listener task for service : " + serviceName);
+ }
+ }
+ workerState = STATE_PAUSED;
+ }
+ }
+
+ /**
+ * Resume this polling task
+ */
+ public void resume() {
+ if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ connection.start();
+ } catch (JMSException e) {
+ log.warn("Error resuming Message Listener task for service : " + serviceName);
+ }
+ }
+ workerState = STATE_STARTED;
+ }
+
+ /**
+ * Execute the polling worker task
+ */
+ public void run() {
+ workerState = STATE_STARTED;
+ activeTaskCount++;
+ int messageCount = 0;
+
+ if (log.isDebugEnabled()) {
+ log.debug("New poll task starting : thread id = " + Thread.currentThread().getId());
+ }
+
+ try {
+ while (isActive() &&
+ (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) &&
+ (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) {
+
+ UserTransaction ut = null;
+ try {
+ if (transactionality == BaseConstants.TRANSACTION_JTA) {
+ ut = getUserTransaction();
+ ut.begin();
+ }
+ } catch (NotSupportedException e) {
+ handleException("Listener Task is already associated with a transaction", e);
+ } catch (SystemException e) {
+ handleException("Error starting a JTA transaction", e);
+ }
+
+ // Get a message by polling, or receive null
+ Message message = receiveMessage();
+
+ if (log.isTraceEnabled()) {
+ if (message != null) {
+ try {
+ log.trace("<<<<<<< READ message with Message ID : " +
+ message.getJMSMessageID() + " from : " + destination +
+ " by Thread ID : " + Thread.currentThread().getId());
+ } catch (JMSException ignore) {}
+ } else {
+ log.trace("No message received by Thread ID : " +
+ Thread.currentThread().getId() + " for destination : " + destination);
+ }
+ }
+
+ if (message != null) {
+ idle = false;
+ idleExecutionCount = 0;
+ messageCount++;
+ // I will be busy now while processing this message, so start another if needed
+ scheduleNewTaskIfAppropriate();
+ handleMessage(message, ut);
+
+ } else {
+ idle = true;
+ idleExecutionCount++;
+ }
+ }
+
+ } finally {
+ workerState = STATE_STOPPED;
+ activeTaskCount--;
+ synchronized(pollingTasks) {
+ pollingTasks.remove(this);
+ }
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() +
+ " is stopping after processing : " + messageCount + " messages :: " +
+ " isActive : " + isActive() + " maxMessagesPerTask : " +
+ getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() +
+ " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " +
+ getIdleTaskExecutionLimit());
+ } else if (log.isDebugEnabled()) {
+ log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() +
+ " is stopping after processing : " + messageCount + " messages");
+ }
+
+ closeConsumer(true);
+ closeSession(true);
+ closeConnection();
+
+ // My time is up, so if I am going away, create another
+ scheduleNewTaskIfAppropriate();
+ }
+
+ /**
+ * Poll for and return a message if available
+ *
+ * @return a message read, or null
+ */
+ private Message receiveMessage() {
+
+ // get a new connection, session and consumer to prevent a conflict.
+ // If idle, it means we can re-use what we already have
+ if (consumer == null) {
+ connection = getConnection();
+ session = getSession();
+ consumer = getMessageConsumer();
+ if (log.isDebugEnabled()) {
+ log.debug("Preparing a Connection, Session and Consumer to read messages");
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Waiting for a message for service : " + serviceName + " - duration : "
+ + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms")));
+ }
+
+ try {
+ if (getReceiveTimeout() < 0) {
+ return consumer.receive();
+ } else {
+ return consumer.receive(getReceiveTimeout());
+ }
+ } catch (IllegalStateException ignore) {
+ // probably the consumer (shared) was closed.. which is still ok.. as we didn't read
+ } catch (JMSException e) {
+ logError("Error receiving message for service : " + serviceName, e);
+ }
+ return null;
+ }
+
+ /**
+ * Invoke ultimate message handler/listener and ack message and/or
+ * commit/rollback transactions
+ * @param message the JMS message received
+ * @param ut the UserTransaction used to receive this message, or null
+ */
+ private void handleMessage(Message message, UserTransaction ut) {
+
+ String messageId = null;
+ try {
+ messageId = message.getJMSMessageID();
+ } catch (JMSException ignore) {}
+
+ boolean commitOrAck = true;
+ try {
+ commitOrAck = jmsMessageReceiver.onMessage(message, ut);
+
+ } finally {
+
+ // if client acknowledgement is selected, and processing requested ACK
+ if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) {
+ try {
+ message.acknowledge();
+ if (log.isDebugEnabled()) {
+ log.debug("Message : " + messageId + " acknowledged");
+ }
+ } catch (JMSException e) {
+ logError("Error acknowledging message : " + messageId, e);
+ }
+ }
+
+ // close the consumer
+ closeConsumer(false);
+
+ // if session was transacted, commit it or rollback
+ try {
+ if (session.getTransacted()) {
+ if (commitOrAck) {
+ session.commit();
+ if (log.isDebugEnabled()) {
+ log.debug("Session for message : " + messageId + " committed");
+ }
+ } else {
+ session.rollback();
+ if (log.isDebugEnabled()) {
+ log.debug("Session for message : " + messageId + " rolled back");
+ }
+ }
+ }
+ } catch (JMSException e) {
+ logError("Error " + (commitOrAck ? "committing" : "rolling back") +
+ " local session txn for message : " + messageId, e);
+ }
+
+ // if a JTA transaction was being used, commit it or rollback
+ try {
+ if (ut != null) {
+ if (commitOrAck) {
+ ut.commit();
+ if (log.isDebugEnabled()) {
+ log.debug("JTA txn for message : " + messageId + " committed");
+ }
+ } else {
+ ut.rollback();
+ if (log.isDebugEnabled()) {
+ log.debug("JTA txn for message : " + messageId + " rolled back");
+ }
+ }
+ }
+ } catch (Exception e) {
+ logError("Error " + (commitOrAck ? "committing" : "rolling back") +
+ " JTA txn for message : " + messageId + " from the session", e);
+ }
+
+ closeSession(false);
+ closeConnection();
+ }
+ }
+
+ /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
+ * cause re-initialization of multiple MessageListenerTasks / Connections
+ */
+ public void onException(JMSException j) {
+
+ if (!isSTMActive()) {
+ requestShutdown();
+ return;
+ }
+
+ log.warn("JMS Connection failure : " + j.getMessage());
+ setConnected(false);
+
+ if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ // failed Connection was not shared, thus no need to restart the whole STM
+ requestShutdown();
+ return;
+ }
+
+ // if we failed while active, update state to show failure
+ setServiceTaskManagerState(STATE_FAILURE);
+ log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks");
+
+ int r = 1;
+ long retryDuration = initialReconnectDuration;
+
+ do {
+ try {
+ log.info("Reconnection attempt : " + r + " for service : " + serviceName);
+ start();
+ } catch (Exception ignore) {}
+
+ boolean connected = false;
+ for (int i=0; i<5; i++) {
+ if (getConnectedTaskCount() == concurrentConsumers) {
+ connected = true;
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {}
+ }
+
+ if (!connected) {
+ log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
+ " failed. Next retry in " + (retryDuration/1000) + "seconds");
+ retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
+ if (retryDuration > maxReconnectDuration) {
+ retryDuration = maxReconnectDuration;
+ }
+
+ try {
+ Thread.sleep(retryDuration);
+ } catch (InterruptedException ignore) {}
+ }
+
+ } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers);
+ }
+
+ protected void requestShutdown() {
+ workerState = STATE_SHUTTING_DOWN;
+ }
+
+ private boolean isActive() {
+ return workerState == STATE_STARTED;
+ }
+
+ protected boolean isTaskIdle() {
+ return idle;
+ }
+
+ public boolean isConnected() {
+ return connected;
+ }
+
+ public void setConnected(boolean connected) {
+ this.connected = connected;
+ }
+
+ /**
+ * Get a Connection that could/should be used by this task - depends on the cache level to reuse
+ * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
+ */
+ private Connection getConnection() {
+ if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ // Connection is not shared
+ if (connection == null) {
+ connection = createConnection();
+ }
+ } else {
+ if (sharedConnection != null) {
+ connection = sharedConnection;
+ } else {
+ synchronized(this) {
+ if (sharedConnection == null) {
+ sharedConnection = createConnection();
+ }
+ connection = sharedConnection;
+ }
+ }
+ }
+ setConnected(true);
+ return connection;
+ }
+
+ /**
+ * Get a Session that could/should be used by this task - depends on the cache level to reuse
+ * @param connection the connection (could be the shared connection) to use to create a Session
+ * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
+ * created using the Connection passed, or a new/shared connection
+ */
+ private Session getSession() {
+ if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) {
+ session = createSession();
+ }
+ return session;
+ }
+
+ /**
+ * Get a MessageConsumer that chould/should be used by this task - depends on the cache
+ * level to reuse
+ * @param connection option Connection to be used
+ * @param session optional Session to be used
+ * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
+ * MessageConsumer possibly using the Connection and Session passed in
+ */
+ private MessageConsumer getMessageConsumer() {
+ if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) {
+ consumer = createConsumer();
+ }
+ return consumer;
+ }
+
+ /**
+ * Close the given Connection, hiding exceptions if any which are logged
+ * @param connection the Connection to be closed
+ */
+ private void closeConnection() {
+ if (connection != null &&
+ cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS connection for service : " + serviceName);
+ }
+ connection.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS connection", e);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ /**
+ * Close the given Session, hiding exceptions if any which are logged
+ * @param session the Session to be closed
+ */
+ private void closeSession(boolean forced) {
+ if (session != null &&
+ (cacheLevel < JMSConstants.CACHE_SESSION || forced)) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS session for service : " + serviceName);
+ }
+ session.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS session", e);
+ } finally {
+ session = null;
+ }
+ }
+ }
+
+ /**
+ * Close the given Consumer, hiding exceptions if any which are logged
+ * @param consumer the Consumer to be closed
+ */
+ private void closeConsumer(boolean forced) {
+ if (consumer != null &&
+ (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS consumer for service : " + serviceName);
+ }
+ consumer.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS consumer", e);
+ } finally {
+ consumer = null;
+ }
+ }
+ }
+
+ /**
+ * Create a new Connection for this STM, using JNDI properties and credentials provided
+ * @return a new Connection for this STM, using JNDI properties and credentials provided
+ */
+ private Connection createConnection() {
+
+ try {
+ conFactory = JMSUtils.lookup(
+ getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
+ log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
+ } catch (NamingException e) {
+ handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+
+ Connection connection = null;
+ try {
+ connection = JMSUtils.createConnection(
+ conFactory,
+ jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME),
+ jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD),
+ isJmsSpec11(), isQueue());
+
+ connection.setExceptionListener(this);
+ connection.start();
+ log.info("JMS Connection for service : " + serviceName + " created and started");
+
+ } catch (JMSException e) {
+ handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+ return connection;
+ }
+
+ /**
+ * Create a new Session for this STM
+ * @param connection the Connection to be used
+ * @return a new Session created using the Connection passed in
+ */
+ private Session createSession() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS Session for service : " + serviceName);
+ }
+ return JMSUtils.createSession(
+ connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS session for service : " + serviceName, e);
+ }
+ return null;
+ }
+
+ /**
+ * Create a new MessageConsumer for this STM
+ * @param session the Session to be used
+ * @return a new MessageConsumer created using the Session passed in
+ */
+ private MessageConsumer createConsumer() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
+ }
+
+ return JMSUtils.createConsumer(
+ session, getDestination(session), isQueue(),
+ (isSubscriptionDurable() && getDurableSubscriberName() == null ?
+ getDurableSubscriberName() : serviceName),
+ getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS consumer for service : " + serviceName,e);
+ }
+ return null;
+ }
+ }
+
+ // -------------- mundane private methods ----------------
+ /**
+ * Get the InitialContext for lookup using the JNDI parameters applicable to the service
+ * @return the InitialContext to be used
+ * @throws NamingException
+ */
+ private Context getInitialContext() throws NamingException {
+ if (context == null) {
+ context = new InitialContext(jmsProperties);
+ }
+ return context;
+ }
+
+ /**
+ * Return the JMS Destination for the JNDI name of the Destination from the InitialContext
+ * @return the JMS Destination to which this STM listens for messages
+ */
+ private Destination getDestination(Session session) {
+ if (destination == null) {
+ try {
+ context = getInitialContext();
+ destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName());
+ if (log.isDebugEnabled()) {
+ log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() +
+ " found for service " + serviceName);
+ }
+ } catch (NamingException e) {
+ try {
+ switch (destinationType) {
+ case JMSConstants.QUEUE: {
+ destination = session.createQueue(getDestinationJNDIName());
+ break;
+ }
+ case JMSConstants.TOPIC: {
+ destination = session.createTopic(getDestinationJNDIName());
+ break;
+ }
+ default: {
+ handleException("Error looking up JMS destination : " +
+ getDestinationJNDIName() + " using JNDI properties : " +
+ jmsProperties, e);
+ }
+ }
+ } catch (JMSException j) {
+ handleException("Error looking up and creating JMS destination : " +
+ getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e);
+ }
+ }
+ }
+ return destination;
+ }
+
+ /**
+ * The UserTransaction to be used, looked up from the JNDI
+ * @return The UserTransaction to be used, looked up from the JNDI
+ */
+ private UserTransaction getUserTransaction() {
+ if (!cacheUserTransaction) {
+ if (log.isDebugEnabled()) {
+ log.debug("Acquiring a new UserTransaction for service : " + serviceName);
+ }
+
+ try {
+ context = getInitialContext();
+ return
+ JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
+ } catch (NamingException e) {
+ handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+ }
+
+ if (sharedUserTransaction == null) {
+ try {
+ context = getInitialContext();
+ sharedUserTransaction =
+ JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
+ if (log.isDebugEnabled()) {
+ log.debug("Acquired shared UserTransaction for service : " + serviceName);
+ }
+ } catch (NamingException e) {
+ handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+ }
+ return sharedUserTransaction;
+ }
+
+ // -------------------- trivial methods ---------------------
+ private boolean isSTMActive() {
+ return serviceTaskManagerState == STATE_STARTED;
+ }
+
+ /**
+ * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination?
+ * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination
+ */
+ private Boolean isQueue() {
+ if (destinationType == JMSConstants.GENERIC) {
+ return null;
+ } else {
+ return destinationType == JMSConstants.QUEUE;
+ }
+ }
+
+ private void logError(String msg, Exception e) {
+ log.error(msg, e);
+ }
+
+ private void handleException(String msg, Exception e) {
+ log.error(msg, e);
+ throw new AxisJMSException(msg, e);
+ }
+
+ private void handleException(String msg) {
+ log.error(msg);
+ throw new AxisJMSException(msg);
+ }
+
+ // -------------- getters and setters ------------------
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String getConnFactoryJNDIName() {
+ return connFactoryJNDIName;
+ }
+
+ public void setConnFactoryJNDIName(String connFactoryJNDIName) {
+ this.connFactoryJNDIName = connFactoryJNDIName;
+ }
+
+ public String getDestinationJNDIName() {
+ return destinationJNDIName;
+ }
+
+ public void setDestinationJNDIName(String destinationJNDIName) {
+ this.destinationJNDIName = destinationJNDIName;
+ }
+
+ public int getDestinationType() {
+ return destinationType;
+ }
+
+ public void setDestinationType(int destinationType) {
+ this.destinationType = destinationType;
+ }
+
+ public String getMessageSelector() {
+ return messageSelector;
+ }
+
+ public void setMessageSelector(String messageSelector) {
+ this.messageSelector = messageSelector;
+ }
+
+ public int getTransactionality() {
+ return transactionality;
+ }
+
+ public void setTransactionality(int transactionality) {
+ this.transactionality = transactionality;
+ sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL);
+ }
+
+ public boolean isSessionTransacted() {
+ return sessionTransacted;
+ }
+
+ public void setSessionTransacted(Boolean sessionTransacted) {
+ if (sessionTransacted != null) {
+ this.sessionTransacted = sessionTransacted;
+ // sesstionTransacted means local transactions are used, however !sessionTransacted does
+ // not mean that JTA is used
+ if (sessionTransacted) {
+ transactionality = BaseConstants.TRANSACTION_LOCAL;
+ }
+ }
+ }
+
+ public int getSessionAckMode() {
+ return sessionAckMode;
+ }
+
+ public void setSessionAckMode(int sessionAckMode) {
+ this.sessionAckMode = sessionAckMode;
+ }
+
+ public boolean isSubscriptionDurable() {
+ return subscriptionDurable;
+ }
+
+ public void setSubscriptionDurable(Boolean subscriptionDurable) {
+ if (subscriptionDurable != null) {
+ this.subscriptionDurable = subscriptionDurable;
+ }
+ }
+
+ public String getDurableSubscriberName() {
+ return durableSubscriberName;
+ }
+
+ public void setDurableSubscriberName(String durableSubscriberName) {
+ this.durableSubscriberName = durableSubscriberName;
+ }
+
+ public boolean isPubSubNoLocal() {
+ return pubSubNoLocal;
+ }
+
+ public void setPubSubNoLocal(Boolean pubSubNoLocal) {
+ if (pubSubNoLocal != null) {
+ this.pubSubNoLocal = pubSubNoLocal;
+ }
+ }
+
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
+
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
+
+ public int getMaxConcurrentConsumers() {
+ return maxConcurrentConsumers;
+ }
+
+ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
+ this.maxConcurrentConsumers = maxConcurrentConsumers;
+ }
+
+ public int getIdleTaskExecutionLimit() {
+ return idleTaskExecutionLimit;
+ }
+
+ public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
+ this.idleTaskExecutionLimit = idleTaskExecutionLimit;
+ }
+
+ public int getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ public void setReceiveTimeout(int receiveTimeout) {
+ this.receiveTimeout = receiveTimeout;
+ }
+
+ public int getCacheLevel() {
+ return cacheLevel;
+ }
+
+ public void setCacheLevel(int cacheLevel) {
+ this.cacheLevel = cacheLevel;
+ }
+
+ public int getInitialReconnectDuration() {
+ return initialReconnectDuration;
+ }
+
+ public void setInitialReconnectDuration(int initialReconnectDuration) {
+ this.initialReconnectDuration = initialReconnectDuration;
+ }
+
+ public double getReconnectionProgressionFactor() {
+ return reconnectionProgressionFactor;
+ }
+
+ public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) {
+ this.reconnectionProgressionFactor = reconnectionProgressionFactor;
+ }
+
+ public long getMaxReconnectDuration() {
+ return maxReconnectDuration;
+ }
+
+ public void setMaxReconnectDuration(long maxReconnectDuration) {
+ this.maxReconnectDuration = maxReconnectDuration;
+ }
+
+ public int getMaxMessagesPerTask() {
+ return maxMessagesPerTask;
+ }
+
+ public void setMaxMessagesPerTask(int maxMessagesPerTask) {
+ this.maxMessagesPerTask = maxMessagesPerTask;
+ }
+
+ public String getUserTransactionJNDIName() {
+ return userTransactionJNDIName;
+ }
+
+ public void setUserTransactionJNDIName(String userTransactionJNDIName) {
+ if (userTransactionJNDIName != null) {
+ this.userTransactionJNDIName = userTransactionJNDIName;
+ }
+ }
+
+ public boolean isCacheUserTransaction() {
+ return cacheUserTransaction;
+ }
+
+ public void setCacheUserTransaction(Boolean cacheUserTransaction) {
+ if (cacheUserTransaction != null) {
+ this.cacheUserTransaction = cacheUserTransaction;
+ }
+ }
+
+ public boolean isJmsSpec11() {
+ return jmsSpec11;
+ }
+
+ public void setJmsSpec11(boolean jmsSpec11) {
+ this.jmsSpec11 = jmsSpec11;
+ }
+
+ public Hashtable<String, String> getJmsProperties() {
+ return jmsProperties;
+ }
+
+ public void addJmsProperties(Map<String, String> jmsProperties) {
+ this.jmsProperties.putAll(jmsProperties);
+ }
+
+ public void removeJmsProperties(String key) {
+ this.jmsProperties.remove(key);
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return conFactory;
+ }
+
+ public List<MessageListenerTask> getPollingTasks() {
+ return pollingTasks;
+ }
+
+ public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) {
+ this.jmsMessageReceiver = jmsMessageReceiver;
+ }
+
+ public void setWorkerPool(WorkerPool workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ public int getActiveTaskCount() {
+ return activeTaskCount;
+ }
+
+ public void setServiceTaskManagerState(int serviceTaskManagerState) {
+ this.serviceTaskManagerState = serviceTaskManagerState;
+ }
+}
Added: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html?rev=724432&view=auto
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html (added)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html Mon Dec 8 10:15:40 2008
@@ -0,0 +1,356 @@
+<html>
+<title>JMS Transport Configuration</title>
+<body>
+
+<h2>JMS Listener Configuration (axis2.xml)</h2>
+
+e.g:
+
+<pre>
+ <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
+ <parameter name="myTopicConnectionFactory">
+ <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
+ <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
+ <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
+ <parameter name="transport.jms.ConnectionFactoryType">topic</parameter>
+ <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter>
+ </parameter>
+
+ <parameter name="myQueueConnectionFactory">
+ <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
+ <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
+ <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
+ <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
+ <parameter name="transport.jms.JMSSpecVersion">1.1</parameter>
+ </parameter>
+
+ <parameter name="default">
+ <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
+ <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
+ <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter>
+ </parameter>
+ </transportReceiver>
+
+ <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">
+ <parameter name="myTopicConnectionFactory">
+ <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
+ <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
+ <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
+ <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter>
+ <parameter name="transport.jms.CacheLevel">producer</parameter>
+ </parameter>
+
+ <parameter name="myQueueConnectionFactory">
+ <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
+ <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
+ <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
+ <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter>
+ <parameter name="transport.jms.CacheLevel">producer</parameter>
+ </parameter>
+
+ <parameter name="default">
+ <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
+ <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
+ <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter>
+ <parameter name="transport.jms.CacheLevel">connection</parameter>
+ </parameter>
+ </transportSender>
+</pre>
+
+<p>
+ The Transport Listener and Sender both allows the user to configure one or more logical JMS Connection
+ Factories, which are named definitions as shown above. Any remaining parameters maybe defined at the
+ service level via the services.xml. The applicable set of parameters for a service would be the
+ union of the properties from the services.xml and the corresponding logical connection factory.
+</p>
+
+<TABLE WIDTH="100%" BORDER=1 BORDERCOLOR="#000000" CELLPADDING=4 CELLSPACING=0>
+ <COL WIDTH="10%">
+ <COL WIDTH="20%">
+ <COL WIDTH="60%">
+ <COL WIDTH="5%">
+ <COL WIDTH="5%">
+ <tr>
+ <td>Transport level</td>
+ <td><BR></td>
+ <td><BR></td>
+ <td>Listening</td>
+ <td>Sending</td>
+ </tr>
+ <tr>
+ <td>JNDI</td>
+ <td>java.naming.factory.initial</td>
+ <td>The JNDI InitialContext factory class</td>
+ <td>Required</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>java.naming.provider.url</td>
+ <td>JNDI Provider URL</td>
+ <td>Required</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>java.naming.security.principal</td>
+ <td>Username for JNDI access</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>java.naming.security.credentials</td>
+ <td>Password for JNDI access</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Transactions</td>
+ <td>transport.Transactionality</td>
+ <td>Desired transactionality. One of none / local / jta</td>
+ <td>Defaults to <B>none</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.UserTxnJNDIName</td>
+ <td>JNDI name to be used to obtain a UserTransaction</td>
+ <td>Defaults to "java:comp/UserTransaction"</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.CacheUserTxn</td>
+ <td>Generally its safe and more efficient to cache the
+ UserTransaction reference from JNDI. One of true/ false</td>
+ <td>Defaults to <B>true</B></td>
+ <td><BR></td>
+ </tr>
+
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.SessionTransacted</td>
+ <td>Should the JMS Session be transacted. One of true/ false</td>
+ <td>Defaults to <B>true</B> when local transactions are used</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.SessionAcknowledgement</td>
+ <td>JMS Session acknowledgement mode to be used. One of AUTO_ACKNOWLEDGE | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTED</td>
+ <td>Defaults to <B>AUTO_ACKNOWLEDGE</B></td>
+ <td><BR></td>
+ </tr>
+
+ <tr>
+ <td>Connection</td>
+ <td>transport.jms.ConnectionFactory</td>
+ <td>Name of the logical connection factory this service will use</td>
+ <td>Defaults to "default"</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ConnectionFactoryJNDIName</td>
+ <td>The JNDI name of the JMS ConnectionFactory</td>
+ <td>Required</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ConnectionFactoryType</td>
+ <td> Type of ConnectionFactory – queue / topic</td>
+ <td>Suggested to be specified</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.JMSSpecVersion</td>
+ <td>JMS API Version One of "1.1" or "1.0.2b"</td>
+ <td>Defaults to 1.1</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.UserName</td>
+ <td>The JMS connection username</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.Password</td>
+ <td>The JMS connection password</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Destinations</td>
+ <td>transport.jms.Destination</td>
+ <td>JNDI Name of the Destination </td>
+ <td>Defaults to Service name</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DestinationType</td>
+ <td>Type of Destination – queue / topic</td>
+ <td>Defaults to a queue</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DefaultReplyDestination</td>
+ <td>JNDI Name of the default reply Destination</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DefaultReplyDestinationType</td>
+ <td>Type of the reply Destination – queue / topic</td>
+ <td>Same type as of Destination</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Advanced</td>
+ <td>transport.jms.MessageSelector</td>
+ <td>Optional message selector to be applied</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.SubscriptionDurable</td>
+ <td>Is the subscription durable? (For Topics) – true / false</td>
+ <td>Defaults to <B>false</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DurableSubscriberName</td>
+ <td>Name to be used for the durable subscription</td>
+ <td>Required when subscription is durable</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.PubSubNoLocal</td>
+ <td>Should messages published by the same connection (for Topics)
+ be received? – true / false</td>
+ <td>Defaults to <B>false</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.CacheLevel</td>
+ <td>The JMS resource cache level. One of none / connection /
+ session / consumer / producer / auto</td>
+ <td>Defaults to <B>auto</B> </td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ReceiveTimeout</td>
+ <td>Time to wait for a JMS message during polling. Negative means
+ wait forever, while 0 means do not wait at all. Anything else, is
+ a millisecond value for the poll</td>
+ <td>Defaults to 1000ms</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ConcurrentConsumers</td>
+ <td>Number of concurrent consumer tasks (~threads) to be started to
+ poll for messages for this service. For Topics, this should be
+ always 1, to prevent the same message being processed multiple
+ times</td>
+ <td>Defaults to <B>1</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.MaxConcurrentConsumers</td>
+ <td>Will dynamically scale the number of concurrent consumer tasks
+ (~threads) until this value; as the load increases. Should always
+ be 1 for Topics.</td>
+ <td>Defaults to <B>1</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.IdleTaskLimit</td>
+ <td>The number of idle (i.e. poll without receipt of a message)
+ runs per task, before it dies – to recycle resources, and to
+ allow dynamic scale down.</td>
+ <td>Defaults to 10</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.MaxMessagesPerTask</td>
+ <td>The maximum number of successful message receipts to limit per
+ Task lifetime. </td>
+ <td>Defaults to <B>–1</B> which implies unlimited messages</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Reconnection</td>
+ <td>transport.jms.InitialReconnectDuration</td>
+ <td>Initial reconnection attempt duration</td>
+ <td>Defaults to 10,000ms</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ReconnectProgressFactor</td>
+ <td>Factor used to compute consecutive reconnection attempt
+ durations, in a geometric series</td>
+ <td>Defaults to <B>2 (i.e. exponential)</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.MaxReconnectDuration</td>
+ <td>Maximum limit for a reconnection duration</td>
+ <td>Defaults to <B>1 hour</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.PublishEPR</td>
+ <td>One or more JMS URL's to be showed as the JMS EPRs on the WSDL
+ for the service. Allows the specification of a custom EPR, and/or
+ hiding of internal properties from a public EPR (e.g.
+ credentials). Add one as LEGACY to retain auto generated EPR, when
+ adding new EPRs</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td><BR></td>
+ <td><BR></td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Legacy Mode and Payload handling</td>
+ <td>Wrapper</td>
+ <td>Binary and Text payload wrapper element to be specified as "{ns}name" where ns refers to a namespace and name the name of the element</td>
+ <td>Default binary wrapper<ul><li>{http://ws.apache.org/commons/ns/payload}binary</li></ul>
+ Default text wrapper <ul><li>{http://ws.apache.org/commons/ns/payload}text</li></ul></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>Operation</td>
+ <td>operation name to be specified as "{ns}name" where ns refers to the namespace and name the name of the operation</td>
+ <td>Defaults to urn:mediate</td>
+ <td><BR></td>
+ </tr>
+</TABLE>
+
+</body>
+</html>
\ No newline at end of file
Modified: webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java (original)
+++ webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java Mon Dec 8 10:15:40 2008
@@ -71,20 +71,24 @@
this.messages = messages;
this.preloadMessages = preloadMessages;
}
+
+ private int concurrencyReached;
+ private final Object concurrencyReachedLock = new Object();
+ private final Object shutdownAwaitLock = new Object();
@Override
protected void runTest() throws Throwable {
int endpointCount = channels.length;
int expectedConcurrency = endpointCount * messages;
- final CountDownLatch shutdownLatch = new CountDownLatch(1);
- final CountDownLatch concurrencyReachedLatch = new CountDownLatch(expectedConcurrency);
-
final MessageReceiver messageReceiver = new MessageReceiver() {
public void receive(MessageContext msgContext) throws AxisFault {
- concurrencyReachedLatch.countDown();
+ synchronized (concurrencyReachedLock) {
+ concurrencyReached++;
+ concurrencyReachedLock.notifyAll();
+ }
try {
- shutdownLatch.await();
+ shutdownAwaitLock.wait();
} catch (InterruptedException ex) {
}
}
@@ -135,14 +139,25 @@
endpointResourceSets[i] = endpointResourceSet;
}
}
-
- if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) {
- fail("Concurrency reached is " + (expectedConcurrency -
- concurrencyReachedLatch.getCount()) + ", but expected " +
- expectedConcurrency);
+
+ long startTime = System.currentTimeMillis();
+ while (concurrencyReached < expectedConcurrency
+ && System.currentTimeMillis() < (startTime + 5000)) {
+ synchronized(concurrencyReachedLock) {
+ concurrencyReachedLock.wait(5000);
+ }
+ }
+
+ synchronized(shutdownAwaitLock) {
+ shutdownAwaitLock.notifyAll();
}
+
+ if (concurrencyReached < expectedConcurrency) {
+ fail("Concurrency reached is " + concurrencyReached + ", but expected " +
+ expectedConcurrency);
+ }
+
} finally {
- shutdownLatch.countDown();
for (int i=0; i<endpointCount; i++) {
if (endpointResourceSets[i] != null) {
endpointResourceSets[i].tearDown();
Modified: webservices/commons/trunk/modules/transport/modules/tests/log4j.properties
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/log4j.properties?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/log4j.properties (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/log4j.properties Mon Dec 8 10:15:40 2008
@@ -20,8 +20,12 @@
# log4j configuration file used by unit tests
log4j.rootCategory=DEBUG, CONSOLE
+#log4j.rootCategory=WARN, CONSOLE
+
+log4j.category.org.apache.axis2.transport.jms=TRACE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.threshold=ERROR
+#log4j.appender.CONSOLE.threshold=TRACE
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%5p [%t] %c{1} %m%n
Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java Mon Dec 8 10:15:40 2008
@@ -137,8 +137,8 @@
}
public void setupService(AxisService service, boolean isClientSide) throws Exception {
- service.addParameter(JMSConstants.CONFAC_PARAM, connectionFactoryName);
- service.addParameter(JMSConstants.DEST_PARAM_TYPE, destinationType);
- service.addParameter(JMSConstants.DEST_PARAM, jndiName);
+ service.addParameter(JMSConstants.PARAM_JMS_CONFAC, connectionFactoryName);
+ service.addParameter(JMSConstants.PARAM_DEST_TYPE, destinationType);
+ service.addParameter(JMSConstants.PARAM_DESTINATION, jndiName);
}
}
Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java Mon Dec 8 10:15:40 2008
@@ -63,8 +63,8 @@
@Override
public void setupService(AxisService service, boolean isClientSide) throws Exception {
super.setupService(service, isClientSide);
- service.addParameter(JMSConstants.REPLY_PARAM_TYPE, replyDestinationType);
- service.addParameter(JMSConstants.REPLY_PARAM, replyJndiName);
+ service.addParameter(JMSConstants.PARAM_REPLY_DEST_TYPE, replyDestinationType);
+ service.addParameter(JMSConstants.PARAM_REPLY_DESTINATION, replyJndiName);
}
public void setupRequestMessageContext(MessageContext msgContext) {
@@ -74,7 +74,7 @@
@Override
public EndpointReference getEndpointReference() throws Exception {
String address = super.getEndpointReference().getAddress();
- return new EndpointReference(address + "&" + JMSConstants.REPLY_PARAM_TYPE + "=" + replyDestinationType + "&" + JMSConstants.REPLY_PARAM + "=" + replyJndiName);
+ return new EndpointReference(address + "&" + JMSConstants.PARAM_REPLY_DEST_TYPE + "=" + replyDestinationType + "&" + JMSConstants.PARAM_REPLY_DESTINATION + "=" + replyJndiName);
}
@Key("replyDestType")
Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java Mon Dec 8 10:15:40 2008
@@ -47,6 +47,7 @@
private final boolean singleCF;
private final boolean cfOnSender;
+ private final int concurrentConsumers;
private @Transient Context context;
/**
@@ -58,9 +59,10 @@
* should also be configured on the sender. This switch allows
* us to build regression tests for SYNAPSE-448.
*/
- public JMSTransportDescriptionFactory(boolean singleCF, boolean cfOnSender) {
+ public JMSTransportDescriptionFactory(boolean singleCF, boolean cfOnSender, int concurrentConsumers) {
this.singleCF = singleCF;
this.cfOnSender = cfOnSender;
+ this.concurrentConsumers = concurrentConsumers;
}
@Setup @SuppressWarnings("unused")
@@ -108,18 +110,20 @@
OMElement element = createParameterElement(JMSConstants.DEFAULT_CONFAC_NAME, null);
element.addChild(createParameterElement(Context.INITIAL_CONTEXT_FACTORY,
MockContextFactory.class.getName()));
- element.addChild(createParameterElement(JMSConstants.CONFAC_JNDI_NAME_PARAM,
+ element.addChild(createParameterElement(JMSConstants.PARAM_CONFAC_JNDI_NAME,
connFactName));
if (type != null) {
- element.addChild(createParameterElement(JMSConstants.CONFAC_TYPE, type));
+ element.addChild(createParameterElement(JMSConstants.PARAM_CONFAC_TYPE, type));
}
+ element.addChild(createParameterElement(JMSConstants.PARAM_CONCURRENT_CONSUMERS,
+ Integer.toString(concurrentConsumers)));
trpDesc.addParameter(new Parameter(name, element));
}
private void setupTransport(ParameterInclude trpDesc) throws AxisFault {
if (singleCF) {
// TODO: setting the type to "queue" is nonsense, but required by the transport (see SYNAPSE-439)
- setupConnectionFactoryConfig(trpDesc, "default", CONNECTION_FACTORY, "queue");
+ setupConnectionFactoryConfig(trpDesc, "default", CONNECTION_FACTORY, null);
} else {
setupConnectionFactoryConfig(trpDesc, "queue", QUEUE_CONNECTION_FACTORY, "queue");
setupConnectionFactoryConfig(trpDesc, "topic", TOPIC_CONNECTION_FACTORY, "topic");
Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java Mon Dec 8 10:15:40 2008
@@ -51,14 +51,19 @@
// SYNAPSE-436:
suite.addExclude("(&(test=EchoXML)(replyDestType=topic)(endpoint=axis))");
-
+
+ // Example to run a few use cases.. please leave these commented out - asankha
+ //suite.addExclude("(|(test=AsyncXML)(test=MinConcurrency)(destType=topic)(broker=qpid)(destType=topic)(replyDestType=topic)(client=jms)(endpoint=mock)(cfOnSender=true))");
+ //suite.addExclude("(|(test=EchoXML)(destType=queue)(broker=qpid)(cfOnSender=true)(singleCF=false)(destType=queue)(client=jms)(endpoint=mock))");
+ //suite.addExclude("(|(test=EchoXML)(test=AsyncXML)(test=AsyncSwA)(test=AsyncTextPlain)(test=AsyncBinary)(test=AsyncSOAPLarge)(broker=qpid))");
+
TransportTestSuiteBuilder builder = new TransportTestSuiteBuilder(suite);
JMSTestEnvironment[] environments = new JMSTestEnvironment[] { new QpidTestEnvironment(), new ActiveMQTestEnvironment() };
for (boolean singleCF : new boolean[] { false, true }) {
for (boolean cfOnSender : new boolean[] { false, true }) {
for (JMSTestEnvironment env : environments) {
- builder.addEnvironment(env, new JMSTransportDescriptionFactory(singleCF, cfOnSender));
+ builder.addEnvironment(env, new JMSTransportDescriptionFactory(singleCF, cfOnSender, 1));
}
}
}
@@ -87,12 +92,12 @@
builder.addEchoEndpoint(new MockEchoEndpoint());
builder.addEchoEndpoint(new AxisEchoEndpoint());
-
+
for (JMSTestEnvironment env : new JMSTestEnvironment[] { new QpidTestEnvironment(), new ActiveMQTestEnvironment() }) {
suite.addTest(new MinConcurrencyTest(new AsyncChannel[] {
new JMSAsyncChannel("endpoint1", JMSConstants.DESTINATION_TYPE_QUEUE, ContentTypeMode.TRANSPORT),
new JMSAsyncChannel("endpoint2", JMSConstants.DESTINATION_TYPE_QUEUE, ContentTypeMode.TRANSPORT) },
- 2, false, env, new JMSTransportDescriptionFactory(false, false)));
+ 2, false, env, new JMSTransportDescriptionFactory(false, false, 2)));
}