You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2006/01/06 05:26:57 UTC
svn commit: r366421 [3/3] - in /webservices/axis2/trunk/java/modules:
adb/src/org/apache/axis2/databinding/types/
adb/src/org/apache/axis2/databinding/utils/
codegen/src/org/apache/axis2/schema/
codegen/src/org/apache/axis2/schema/writer/ codegen/src/o...
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java Thu Jan 5 20:26:39 2006
@@ -1,897 +1,900 @@
-/*
-* Copyright 2001, 2002,2004 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-
-package org.apache.axis2.transport.jms;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-/**
- * JMSConnector is an abstract class that encapsulates the work of connecting
- * to JMS destinations. Its subclasses are TopicConnector and QueueConnector
- * which further specialize connections to the pub-sub and the ptp domains.
- * It also implements the capability to retry connections in the event of
- * failures.
- */
-public abstract class JMSConnector {
- protected JMSVendorAdapter m_adapter;
- protected boolean m_allowReceive;
- protected long m_connectRetryInterval;
- protected long m_interactRetryInterval;
- protected JMSURLHelper m_jmsurl;
- protected int m_numRetries;
- protected int m_numSessions;
- protected long m_poolTimeout;
- protected AsyncConnection m_receiveConnection;
- protected SyncConnection m_sendConnection;
- protected long m_timeoutTime;
-
- public JMSConnector(ConnectionFactory connectionFactory, int numRetries, int numSessions,
- long connectRetryInterval, long interactRetryInterval, long timeoutTime,
- boolean allowReceive, String clientID, String username, String password,
- JMSVendorAdapter adapter, JMSURLHelper jmsurl)
- throws JMSException {
- m_numRetries = numRetries;
- m_connectRetryInterval = connectRetryInterval;
- m_interactRetryInterval = interactRetryInterval;
- m_timeoutTime = timeoutTime;
- m_poolTimeout = timeoutTime / (long) numRetries;
- m_numSessions = numSessions;
- m_allowReceive = allowReceive;
- m_adapter = adapter;
- m_jmsurl = jmsurl;
-
- // try to connect initially so we can fail fast
- // in the case of irrecoverable errors.
- // If we fail in a recoverable fashion we will retry
- javax.jms.Connection sendConnection = createConnectionWithRetry(connectionFactory,
- username, password);
-
- m_sendConnection = createSyncConnection(connectionFactory, sendConnection, m_numSessions,
- "SendThread", clientID, username, password);
- m_sendConnection.start();
-
- if (m_allowReceive) {
- javax.jms.Connection receiveConnection = createConnectionWithRetry(connectionFactory,
- username, password);
-
- m_receiveConnection = createAsyncConnection(connectionFactory, receiveConnection,
- "ReceiveThread", clientID, username, password);
- m_receiveConnection.start();
- }
- }
-
- protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection, String threadName, String clientID, String username,
- String password)
- throws JMSException;
-
- protected javax.jms.Connection createConnectionWithRetry(ConnectionFactory connectionFactory,
- String username, String password)
- throws JMSException {
- javax.jms.Connection connection = null;
-
- for (int numTries = 1; connection == null; numTries++) {
- try {
- connection = internalConnect(connectionFactory, username, password);
- } catch (JMSException jmse) {
- if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION)
- || (numTries == m_numRetries)) {
- throw jmse;
- } else {
- try {
- Thread.sleep(m_connectRetryInterval);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
-
- return connection;
- }
-
- public abstract JMSEndpoint createEndpoint(Destination destination) throws JMSException;
-
- public abstract JMSEndpoint createEndpoint(String destinationName) throws JMSException;
-
- protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection, int numSessions, String threadName, String clientID,
- String username, String password)
- throws JMSException;
-
- protected abstract javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
- String username, String password)
- throws JMSException;
-
- public int numSessions() {
- return m_numSessions;
- }
-
- public void shutdown() {
- m_sendConnection.shutdown();
-
- if (m_allowReceive) {
- m_receiveConnection.shutdown();
- }
- }
-
- public void start() {
- m_sendConnection.startConnection();
-
- if (m_allowReceive) {
- m_receiveConnection.startConnection();
- }
-
- JMSConnectorManager.getInstance().addConnectorToPool(this);
- }
-
- public void stop() {
- JMSConnectorManager.getInstance().removeConnectorFromPool(this);
- m_sendConnection.stopConnection();
-
- if (m_allowReceive) {
- m_receiveConnection.stopConnection();
- }
- }
-
- public String getClientID() {
- return getSendConnection().getClientID();
- }
-
- public ConnectionFactory getConnectionFactory() {
-
- // there is always a send connection
- return getSendConnection().getConnectionFactory();
- }
-
- public JMSURLHelper getJMSURL() {
- return m_jmsurl;
- }
-
- public int getNumRetries() {
- return m_numRetries;
- }
-
- public String getPassword() {
- return getSendConnection().getPassword();
- }
-
- AsyncConnection getReceiveConnection() {
- return m_receiveConnection;
- }
-
- SyncConnection getSendConnection() {
- return m_sendConnection;
- }
-
- public String getUsername() {
- return getSendConnection().getUsername();
- }
-
- public JMSVendorAdapter getVendorAdapter() {
- return m_adapter;
- }
-
- protected abstract class AsyncConnection extends Connection {
- Object m_subscriptionLock;
- HashMap m_subscriptions;
-
- protected AsyncConnection(ConnectionFactory connectionFactory,
- javax.jms.Connection connection, String threadName,
- String clientID, String username, String password)
- throws JMSException {
- super(connectionFactory, connection, threadName, clientID, username, password);
- m_subscriptions = new HashMap();
- m_subscriptionLock = new Object();
- }
-
- protected abstract ListenerSession createListenerSession(javax.jms.Connection connection,
- Subscription subscription)
- throws Exception;
-
- protected void onConnect() throws Exception {
- synchronized (m_subscriptionLock) {
- Iterator subscriptions = m_subscriptions.keySet().iterator();
-
- while (subscriptions.hasNext()) {
- Subscription subscription = (Subscription) subscriptions.next();
-
- if (m_subscriptions.get(subscription) == null) {
- m_subscriptions.put(subscription,
- createListenerSession(m_connection, subscription));
- }
- }
-
- m_subscriptionLock.notifyAll();
- }
- }
-
- protected void onException() {
- synchronized (m_subscriptionLock) {
- Iterator subscriptions = m_subscriptions.keySet().iterator();
-
- while (subscriptions.hasNext()) {
- Subscription subscription = (Subscription) subscriptions.next();
-
- m_subscriptions.put(subscription, null);
- }
- }
- }
-
- protected void onShutdown() {
- synchronized (m_subscriptionLock) {
- Iterator subscriptions = m_subscriptions.keySet().iterator();
-
- while (subscriptions.hasNext()) {
- Subscription subscription = (Subscription) subscriptions.next();
- ListenerSession session =
- (ListenerSession) m_subscriptions.get(subscription);
-
- if (session != null) {
- session.cleanup();
- }
- }
-
- m_subscriptions.clear();
- }
- }
-
- /**
- * @param subscription
- * @todo add in security exception propagation
- */
- void subscribe(Subscription subscription) throws Exception {
- long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
-
- synchronized (m_subscriptionLock) {
- if (m_subscriptions.containsKey(subscription)) {
- return;
- }
-
- while (true) {
- if (System.currentTimeMillis() > timeoutTime) {
- throw new InvokeTimeoutException("Cannot subscribe listener");
- }
-
- try {
- ListenerSession session = createListenerSession(m_connection, subscription);
-
- m_subscriptions.put(subscription, session);
-
- break;
- } catch (JMSException jmse) {
- if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
- throw jmse;
- }
-
- try {
- m_subscriptionLock.wait(m_interactRetryInterval);
- } catch (InterruptedException ignore) {
- }
-
- // give reconnect a chance
- Thread.yield();
-
- continue;
- } catch (NullPointerException jmse) {
-
- // we ARE reconnecting
- try {
- m_subscriptionLock.wait(m_interactRetryInterval);
- } catch (InterruptedException ignore) {
- }
-
- // give reconnect a chance
- Thread.yield();
-
- continue;
- }
- }
- }
- }
-
- void unsubscribe(Subscription subscription) {
- long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
-
- synchronized (m_subscriptionLock) {
- if (!m_subscriptions.containsKey(subscription)) {
- return;
- }
-
- while (true) {
- if (System.currentTimeMillis() > timeoutTime) {
- throw new InvokeTimeoutException("Cannot unsubscribe listener");
- }
-
- // give reconnect a chance
- Thread.yield();
-
- try {
- ListenerSession session =
- (ListenerSession) m_subscriptions.get(subscription);
-
- session.cleanup();
- m_subscriptions.remove(subscription);
-
- break;
- } catch (NullPointerException jmse) {
-
- // we are reconnecting
- try {
- m_subscriptionLock.wait(m_interactRetryInterval);
- } catch (InterruptedException ignore) {
- }
-
- continue;
- }
- }
- }
- }
-
- protected class ListenerSession extends ConnectorSession {
- protected MessageConsumer m_consumer;
- protected Subscription m_subscription;
-
- ListenerSession(Session session, MessageConsumer consumer, Subscription subscription)
- throws Exception {
- super(session);
- m_subscription = subscription;
- m_consumer = consumer;
-
- Destination destination = subscription.m_endpoint.getDestination(m_session);
-
- m_consumer.setMessageListener(subscription.m_listener);
- }
-
- void cleanup() {
- try {
- m_consumer.close();
- } catch (Exception ignore) {
- }
-
- try {
- m_session.close();
- } catch (Exception ignore) {
- }
- }
- }
- }
-
-
- private abstract class Connection extends Thread implements ExceptionListener {
- private String m_clientID;
- protected javax.jms.Connection m_connection;
- private ConnectionFactory m_connectionFactory;
- protected boolean m_isActive;
- private Object m_jmsLock;
- private Object m_lifecycleLock;
- private boolean m_needsToConnect;
- private String m_password;
- private boolean m_startConnection;
- private String m_username;
-
- protected Connection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
- String threadName, String clientID, String username, String password)
- throws JMSException {
- super(threadName);
- m_connectionFactory = connectionFactory;
- m_clientID = clientID;
- m_username = username;
- m_password = password;
- m_jmsLock = new Object();
- m_lifecycleLock = new Object();
-
- if (connection != null) {
- m_needsToConnect = false;
- m_connection = connection;
- m_connection.setExceptionListener(this);
-
- if (m_clientID != null) {
- m_connection.setClientID(m_clientID);
- }
- } else {
- m_needsToConnect = true;
- }
-
- m_isActive = true;
- }
-
- private void internalOnConnect() throws Exception {
- onConnect();
-
- synchronized (m_lifecycleLock) {
- if (m_startConnection) {
- try {
- m_connection.start();
- } catch (Throwable e) {
- } // ignore
- }
- }
- }
-
- private void internalOnShutdown() {
- stopConnection();
- onShutdown();
-
- try {
- m_connection.close();
- } catch (Throwable e) {
- } // ignore
- }
-
- protected abstract void onConnect() throws Exception;
-
- protected abstract void onException();
-
- public void onException(JMSException exception) {
- if (m_adapter.isRecoverable(exception, JMSVendorAdapter.ON_EXCEPTION_ACTION)) {
- return;
- }
-
- onException();
-
- synchronized (m_jmsLock) {
- m_jmsLock.notifyAll();
- }
- }
-
- protected abstract void onShutdown();
-
- /**
- * @todo handle non-recoverable errors
- */
- public void run() {
-
- // loop until a connection is made and when a connection is made (re)establish
- // any subscriptions
- while (m_isActive) {
- if (m_needsToConnect) {
- m_connection = null;
-
- try {
- m_connection = internalConnect(m_connectionFactory, m_username, m_password);
- m_connection.setExceptionListener(this);
-
- if (m_clientID != null) {
- m_connection.setClientID(m_clientID);
- }
- } catch (JMSException e) {
-
- // simply backoff for a while and then retry
- try {
- Thread.sleep(m_connectRetryInterval);
- } catch (InterruptedException ie) {
- }
-
- continue;
- }
- } else {
- m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
- }
-
- // we lost the connection
-
- // we now have a valid connection so establish some context
- try {
- internalOnConnect();
- } catch (Exception e) {
-
- // insert code to handle non recoverable errors
- // simply retry
- continue;
- }
-
- synchronized (m_jmsLock) {
- try {
- m_jmsLock.wait();
- } catch (InterruptedException ie) {
- } // until notified due to some change in status
- }
- }
-
- // no longer staying connected, so see what we can cleanup
- internalOnShutdown();
- }
-
- void shutdown() {
- m_isActive = false;
-
- synchronized (m_jmsLock) {
- m_jmsLock.notifyAll();
- }
- }
-
- void startConnection() {
- synchronized (m_lifecycleLock) {
- if (m_startConnection) {
- return;
- }
-
- m_startConnection = true;
-
- try {
- m_connection.start();
- } catch (Throwable e) {
- } // ignore
- }
- }
-
- void stopConnection() {
- synchronized (m_lifecycleLock) {
- if (!m_startConnection) {
- return;
- }
-
- m_startConnection = false;
-
- try {
- m_connection.stop();
- } catch (Throwable e) {
- } // ignore
- }
- }
-
- public String getClientID() {
- return m_clientID;
- }
-
- public ConnectionFactory getConnectionFactory() {
- return m_connectionFactory;
- }
-
- public String getPassword() {
- return m_password;
- }
-
- public String getUsername() {
- return m_username;
- }
- }
-
-
- private abstract class ConnectorSession {
- Session m_session;
-
- ConnectorSession(Session session) throws JMSException {
- m_session = session;
- }
- }
-
-
- protected abstract class SyncConnection extends Connection {
- int m_numSessions;
- Object m_senderLock;
- LinkedList m_senders;
-
- SyncConnection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
- int numSessions, String threadName, String clientID, String username,
- String password)
- throws JMSException {
- super(connectionFactory, connection, threadName, clientID, username, password);
- m_senders = new LinkedList();
- m_numSessions = numSessions;
- m_senderLock = new Object();
- }
-
- byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
- throws Exception {
- long timeoutTime = System.currentTimeMillis() + timeout;
-
- while (true) {
- if (System.currentTimeMillis() > timeoutTime) {
- throw new InvokeTimeoutException("Unable to complete call in time allotted");
- }
-
- SendSession sendSession = null;
-
- try {
- sendSession = getSessionFromPool(m_poolTimeout);
-
- byte[] response = sendSession.call(endpoint, message,
- timeoutTime - System.currentTimeMillis(),
- properties);
-
- returnSessionToPool(sendSession);
-
- if (response == null) {
- throw new InvokeTimeoutException(
- "Unable to complete call in time allotted");
- }
-
- return response;
- } catch (JMSException jmse) {
- if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
-
- // this we cannot recover from
- // but it does not invalidate the session
- returnSessionToPool(sendSession);
-
- throw jmse;
- }
-
- // for now we will assume this is a reconnect related issue
- // and let the sender be collected
- // give the reconnect thread a chance to fill the pool
- Thread.yield();
-
- continue;
- } catch (NullPointerException npe) {
- Thread.yield();
-
- continue;
- }
- }
- }
-
- protected abstract SendSession createSendSession(javax.jms.Connection connection)
- throws JMSException;
-
- protected void onConnect() throws JMSException {
- synchronized (m_senderLock) {
- for (int i = 0; i < m_numSessions; i++) {
- m_senders.add(createSendSession(m_connection));
- }
-
- m_senderLock.notifyAll();
- }
- }
-
- protected void onException() {
- synchronized (m_senderLock) {
- m_senders.clear();
- }
- }
-
- protected void onShutdown() {
- synchronized (m_senderLock) {
- Iterator senders = m_senders.iterator();
-
- while (senders.hasNext()) {
- SendSession session = (SendSession) senders.next();
-
- session.cleanup();
- }
-
- m_senders.clear();
- }
- }
-
- private void returnSessionToPool(SendSession sendSession) {
- synchronized (m_senderLock) {
- m_senders.addLast(sendSession);
- m_senderLock.notifyAll();
- }
- }
-
- /**
- * @todo add in handling for security exceptions
- * @todo add support for timeouts
- */
- void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
- long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
-
- while (true) {
- if (System.currentTimeMillis() > timeoutTime) {
- throw new InvokeTimeoutException("Cannot complete send in time allotted");
- }
-
- SendSession sendSession = null;
-
- try {
- sendSession = getSessionFromPool(m_poolTimeout);
- sendSession.send(endpoint, message, properties);
- returnSessionToPool(sendSession);
- } catch (JMSException jmse) {
- if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
-
- // this we cannot recover from
- // but it does not invalidate the session
- returnSessionToPool(sendSession);
-
- throw jmse;
- }
-
- // for now we will assume this is a reconnect related issue
- // and let the sender be collected
- // give the reconnect thread a chance to fill the pool
- Thread.yield();
-
- continue;
- } catch (NullPointerException npe) {
-
- // give the reconnect thread a chance to fill the pool
- Thread.yield();
-
- continue;
- }
-
- break;
- }
- }
-
- private SendSession getSessionFromPool(long timeout) {
- synchronized (m_senderLock) {
- while (m_senders.size() == 0) {
- try {
- m_senderLock.wait(timeout);
-
- if (m_senders.size() == 0) {
- return null;
- }
- } catch (InterruptedException ignore) {
- return null;
- }
- }
-
- return (SendSession) m_senders.removeFirst();
- }
- }
-
- protected abstract class SendSession extends ConnectorSession {
- MessageProducer m_producer;
-
- SendSession(Session session, MessageProducer producer) throws JMSException {
- super(session);
- m_producer = producer;
- }
-
- byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
- throws Exception {
- Destination reply = createTemporaryDestination();
- MessageConsumer subscriber = createConsumer(reply);
- BytesMessage jmsMessage = m_session.createBytesMessage();
-
- jmsMessage.writeBytes(message);
- jmsMessage.setJMSReplyTo(reply);
-
- int deliveryMode = extractDeliveryMode(properties);
- int priority = extractPriority(properties);
- long timeToLive = extractTimeToLive(properties);
-
- if ((properties != null) && !properties.isEmpty()) {
- setProperties(properties, jmsMessage);
- }
-
- send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
- timeToLive);
-
- BytesMessage response = null;
-
- try {
- response = (BytesMessage) subscriber.receive(timeout);
- } catch (ClassCastException cce) {
- throw new InvokeException(
- "Error: unexpected message type received - expected BytesMessage");
- }
-
- byte[] respBytes = null;
-
- if (response != null) {
- byte[] buffer = new byte[8 * 1024];
- ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- for (int bytesRead = response.readBytes(buffer); bytesRead != -1;
- bytesRead = response.readBytes(buffer)) {
- out.write(buffer, 0, bytesRead);
- }
-
- respBytes = out.toByteArray();
- }
-
- subscriber.close();
- deleteTemporaryDestination(reply);
-
- return respBytes;
- }
-
- void cleanup() {
- try {
- m_producer.close();
- } catch (Throwable t) {
- }
-
- try {
- m_session.close();
- } catch (Throwable t) {
- }
- }
-
- protected abstract MessageConsumer createConsumer(Destination destination)
- throws JMSException;
-
- protected abstract Destination createTemporaryDestination() throws JMSException;
-
- protected abstract void deleteTemporaryDestination(Destination destination)
- throws JMSException;
-
- private int extractDeliveryMode(HashMap properties) {
- return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
- JMSConstants.DEFAULT_DELIVERY_MODE);
- }
-
- private int extractPriority(HashMap properties) {
- return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
- JMSConstants.DEFAULT_PRIORITY);
- }
-
- private long extractTimeToLive(HashMap properties) {
- return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
- JMSConstants.DEFAULT_TIME_TO_LIVE);
- }
-
- void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
- BytesMessage jmsMessage = m_session.createBytesMessage();
-
- jmsMessage.writeBytes(message);
-
- int deliveryMode = extractDeliveryMode(properties);
- int priority = extractPriority(properties);
- long timeToLive = extractTimeToLive(properties);
-
- if ((properties != null) && !properties.isEmpty()) {
- setProperties(properties, jmsMessage);
- }
-
- send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
- timeToLive);
- }
-
- protected abstract void send(Destination destination, Message message,
- int deliveryMode, int priority, long timeToLive)
- throws JMSException;
-
- private void setProperties(HashMap properties, Message message) throws JMSException {
- Iterator propertyIter = properties.entrySet().iterator();
-
- while (propertyIter.hasNext()) {
- Map.Entry property = (Map.Entry) propertyIter.next();
-
- setProperty((String) property.getKey(), property.getValue(), message);
- }
- }
-
- private void setProperty(String property, Object value, Message message)
- throws JMSException {
- if (property == null) {
- return;
- }
-
- if (property.equals(JMSConstants.JMS_CORRELATION_ID)) {
- message.setJMSCorrelationID((String) value);
- } else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES)) {
- message.setJMSCorrelationIDAsBytes((byte[]) value);
- } else if (property.equals(JMSConstants.JMS_TYPE)) {
- message.setJMSType((String) value);
- } else {
- message.setObjectProperty(property, value);
- }
- }
- }
- }
-}
+/*
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * JMSConnector is an abstract class that encapsulates the work of connecting
+ * to JMS destinations. Its subclasses are TopicConnector and QueueConnector
+ * which further specialize connections to the pub-sub and the ptp domains.
+ * It also implements the capability to retry connections in the event of
+ * failures.
+ */
+public abstract class JMSConnector {
+ protected JMSVendorAdapter m_adapter;
+ protected boolean m_allowReceive;
+ protected long m_connectRetryInterval;
+ protected long m_interactRetryInterval;
+ protected JMSURLHelper m_jmsurl;
+ protected int m_numRetries;
+ protected int m_numSessions;
+ protected long m_poolTimeout;
+ protected AsyncConnection m_receiveConnection;
+ protected SyncConnection m_sendConnection;
+ protected long m_timeoutTime;
+
+ public JMSConnector(ConnectionFactory connectionFactory, int numRetries, int numSessions,
+ long connectRetryInterval, long interactRetryInterval, long timeoutTime,
+ boolean allowReceive, String clientID, String username, String password,
+ JMSVendorAdapter adapter, JMSURLHelper jmsurl)
+ throws JMSException {
+ m_numRetries = numRetries;
+ m_connectRetryInterval = connectRetryInterval;
+ m_interactRetryInterval = interactRetryInterval;
+ m_timeoutTime = timeoutTime;
+ m_poolTimeout = timeoutTime / (long) numRetries;
+ m_numSessions = numSessions;
+ m_allowReceive = allowReceive;
+ m_adapter = adapter;
+ m_jmsurl = jmsurl;
+
+ // try to connect initially so we can fail fast
+ // in the case of irrecoverable errors.
+ // If we fail in a recoverable fashion we will retry
+ javax.jms.Connection sendConnection = createConnectionWithRetry(connectionFactory,
+ username, password);
+
+ m_sendConnection = createSyncConnection(connectionFactory, sendConnection, m_numSessions,
+ "SendThread", clientID, username, password);
+ m_sendConnection.start();
+
+ if (m_allowReceive) {
+ javax.jms.Connection receiveConnection = createConnectionWithRetry(connectionFactory,
+ username, password);
+
+ m_receiveConnection = createAsyncConnection(connectionFactory, receiveConnection,
+ "ReceiveThread", clientID, username, password);
+ m_receiveConnection.start();
+ }
+ }
+
+ protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection, String threadName, String clientID, String username,
+ String password)
+ throws JMSException;
+
+ protected javax.jms.Connection createConnectionWithRetry(ConnectionFactory connectionFactory,
+ String username, String password)
+ throws JMSException {
+ javax.jms.Connection connection = null;
+
+ for (int numTries = 1; connection == null; numTries++) {
+ try {
+ connection = internalConnect(connectionFactory, username, password);
+ } catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION)
+ || (numTries == m_numRetries)) {
+ throw jmse;
+ } else {
+ try {
+ Thread.sleep(m_connectRetryInterval);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+
+ return connection;
+ }
+
+ public abstract JMSEndpoint createEndpoint(Destination destination) throws JMSException;
+
+ public abstract JMSEndpoint createEndpoint(String destinationName) throws JMSException;
+
+ protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection, int numSessions, String threadName, String clientID,
+ String username, String password)
+ throws JMSException;
+
+ protected abstract javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+ String username, String password)
+ throws JMSException;
+
+ public int numSessions() {
+ return m_numSessions;
+ }
+
+ public void shutdown() {
+ m_sendConnection.shutdown();
+
+ if (m_allowReceive) {
+ m_receiveConnection.shutdown();
+ }
+ }
+
+ public void start() {
+ m_sendConnection.startConnection();
+
+ if (m_allowReceive) {
+ m_receiveConnection.startConnection();
+ }
+
+ JMSConnectorManager.getInstance().addConnectorToPool(this);
+ }
+
+ public void stop() {
+ JMSConnectorManager.getInstance().removeConnectorFromPool(this);
+ m_sendConnection.stopConnection();
+
+ if (m_allowReceive) {
+ m_receiveConnection.stopConnection();
+ }
+ }
+
+ public String getClientID() {
+ return getSendConnection().getClientID();
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+
+ // there is always a send connection
+ return getSendConnection().getConnectionFactory();
+ }
+
+ public JMSURLHelper getJMSURL() {
+ return m_jmsurl;
+ }
+
+ public int getNumRetries() {
+ return m_numRetries;
+ }
+
+ public String getPassword() {
+ return getSendConnection().getPassword();
+ }
+
+ AsyncConnection getReceiveConnection() {
+ return m_receiveConnection;
+ }
+
+ SyncConnection getSendConnection() {
+ return m_sendConnection;
+ }
+
+ public String getUsername() {
+ return getSendConnection().getUsername();
+ }
+
+ public JMSVendorAdapter getVendorAdapter() {
+ return m_adapter;
+ }
+
+ protected abstract class AsyncConnection extends Connection {
+ Object m_subscriptionLock;
+ HashMap m_subscriptions;
+
+ protected AsyncConnection(ConnectionFactory connectionFactory,
+ javax.jms.Connection connection, String threadName,
+ String clientID, String username, String password)
+ throws JMSException {
+ super(connectionFactory, connection, threadName, clientID, username, password);
+ m_subscriptions = new HashMap();
+ m_subscriptionLock = new Object();
+ }
+
+ protected abstract ListenerSession createListenerSession(javax.jms.Connection connection,
+ Subscription subscription)
+ throws Exception;
+
+ protected void onConnect() throws Exception {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
+
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
+
+ if (m_subscriptions.get(subscription) == null) {
+ m_subscriptions.put(subscription,
+ createListenerSession(m_connection, subscription));
+ }
+ }
+
+ m_subscriptionLock.notifyAll();
+ }
+ }
+
+ protected void onException() {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
+
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
+
+ m_subscriptions.put(subscription, null);
+ }
+ }
+ }
+
+ protected void onShutdown() {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
+
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
+ ListenerSession session =
+ (ListenerSession) m_subscriptions.get(subscription);
+
+ if (session != null) {
+ session.cleanup();
+ }
+ }
+
+ m_subscriptions.clear();
+ }
+ }
+
+ /**
+ * @param subscription
+ *
+ */
+
+ void subscribe(Subscription subscription) throws Exception {
+ // TODO: add in security exception propagation
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
+ synchronized (m_subscriptionLock) {
+ if (m_subscriptions.containsKey(subscription)) {
+ return;
+ }
+
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot subscribe listener");
+ }
+
+ try {
+ ListenerSession session = createListenerSession(m_connection, subscription);
+
+ m_subscriptions.put(subscription, session);
+
+ break;
+ } catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
+ throw jmse;
+ }
+
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ } catch (InterruptedException ignore) {
+ }
+
+ // give reconnect a chance
+ Thread.yield();
+
+ continue;
+ } catch (NullPointerException jmse) {
+
+ // we ARE reconnecting
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ } catch (InterruptedException ignore) {
+ }
+
+ // give reconnect a chance
+ Thread.yield();
+
+ continue;
+ }
+ }
+ }
+ }
+
+ void unsubscribe(Subscription subscription) {
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
+ synchronized (m_subscriptionLock) {
+ if (!m_subscriptions.containsKey(subscription)) {
+ return;
+ }
+
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot unsubscribe listener");
+ }
+
+ // give reconnect a chance
+ Thread.yield();
+
+ try {
+ ListenerSession session =
+ (ListenerSession) m_subscriptions.get(subscription);
+
+ session.cleanup();
+ m_subscriptions.remove(subscription);
+
+ break;
+ } catch (NullPointerException jmse) {
+
+ // we are reconnecting
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ } catch (InterruptedException ignore) {
+ }
+
+ continue;
+ }
+ }
+ }
+ }
+
+ protected class ListenerSession extends ConnectorSession {
+ protected MessageConsumer m_consumer;
+ protected Subscription m_subscription;
+
+ ListenerSession(Session session, MessageConsumer consumer, Subscription subscription)
+ throws Exception {
+ super(session);
+ m_subscription = subscription;
+ m_consumer = consumer;
+
+ Destination destination = subscription.m_endpoint.getDestination(m_session);
+
+ m_consumer.setMessageListener(subscription.m_listener);
+ }
+
+ void cleanup() {
+ try {
+ m_consumer.close();
+ } catch (Exception ignore) {
+ }
+
+ try {
+ m_session.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+
+
+ private abstract class Connection extends Thread implements ExceptionListener {
+ private String m_clientID;
+ protected javax.jms.Connection m_connection;
+ private ConnectionFactory m_connectionFactory;
+ protected boolean m_isActive;
+ private Object m_jmsLock;
+ private Object m_lifecycleLock;
+ private boolean m_needsToConnect;
+ private String m_password;
+ private boolean m_startConnection;
+ private String m_username;
+
+ protected Connection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
+ String threadName, String clientID, String username, String password)
+ throws JMSException {
+ super(threadName);
+ m_connectionFactory = connectionFactory;
+ m_clientID = clientID;
+ m_username = username;
+ m_password = password;
+ m_jmsLock = new Object();
+ m_lifecycleLock = new Object();
+
+ if (connection != null) {
+ m_needsToConnect = false;
+ m_connection = connection;
+ m_connection.setExceptionListener(this);
+
+ if (m_clientID != null) {
+ m_connection.setClientID(m_clientID);
+ }
+ } else {
+ m_needsToConnect = true;
+ }
+
+ m_isActive = true;
+ }
+
+ private void internalOnConnect() throws Exception {
+ onConnect();
+
+ synchronized (m_lifecycleLock) {
+ if (m_startConnection) {
+ try {
+ m_connection.start();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+ }
+
+ private void internalOnShutdown() {
+ stopConnection();
+ onShutdown();
+
+ try {
+ m_connection.close();
+ } catch (Throwable e) {
+ } // ignore
+ }
+
+ protected abstract void onConnect() throws Exception;
+
+ protected abstract void onException();
+
+ public void onException(JMSException exception) {
+ if (m_adapter.isRecoverable(exception, JMSVendorAdapter.ON_EXCEPTION_ACTION)) {
+ return;
+ }
+
+ onException();
+
+ synchronized (m_jmsLock) {
+ m_jmsLock.notifyAll();
+ }
+ }
+
+ protected abstract void onShutdown();
+
+ /**
+ *
+ */
+ public void run() {
+ // TODO: handle non-recoverable errors
+ // loop until a connection is made and when a connection is made (re)establish
+ // any subscriptions
+ while (m_isActive) {
+ if (m_needsToConnect) {
+ m_connection = null;
+
+ try {
+ m_connection = internalConnect(m_connectionFactory, m_username, m_password);
+ m_connection.setExceptionListener(this);
+
+ if (m_clientID != null) {
+ m_connection.setClientID(m_clientID);
+ }
+ } catch (JMSException e) {
+
+ // simply backoff for a while and then retry
+ try {
+ Thread.sleep(m_connectRetryInterval);
+ } catch (InterruptedException ie) {
+ }
+
+ continue;
+ }
+ } else {
+ m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
+ }
+
+ // we lost the connection
+
+ // we now have a valid connection so establish some context
+ try {
+ internalOnConnect();
+ } catch (Exception e) {
+
+ // insert code to handle non recoverable errors
+ // simply retry
+ continue;
+ }
+
+ synchronized (m_jmsLock) {
+ try {
+ m_jmsLock.wait();
+ } catch (InterruptedException ie) {
+ } // until notified due to some change in status
+ }
+ }
+
+ // no longer staying connected, so see what we can cleanup
+ internalOnShutdown();
+ }
+
+ void shutdown() {
+ m_isActive = false;
+
+ synchronized (m_jmsLock) {
+ m_jmsLock.notifyAll();
+ }
+ }
+
+ void startConnection() {
+ synchronized (m_lifecycleLock) {
+ if (m_startConnection) {
+ return;
+ }
+
+ m_startConnection = true;
+
+ try {
+ m_connection.start();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+
+ void stopConnection() {
+ synchronized (m_lifecycleLock) {
+ if (!m_startConnection) {
+ return;
+ }
+
+ m_startConnection = false;
+
+ try {
+ m_connection.stop();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+
+ public String getClientID() {
+ return m_clientID;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return m_connectionFactory;
+ }
+
+ public String getPassword() {
+ return m_password;
+ }
+
+ public String getUsername() {
+ return m_username;
+ }
+ }
+
+
+ private abstract class ConnectorSession {
+ Session m_session;
+
+ ConnectorSession(Session session) throws JMSException {
+ m_session = session;
+ }
+ }
+
+
+ protected abstract class SyncConnection extends Connection {
+ int m_numSessions;
+ Object m_senderLock;
+ LinkedList m_senders;
+
+ SyncConnection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
+ int numSessions, String threadName, String clientID, String username,
+ String password)
+ throws JMSException {
+ super(connectionFactory, connection, threadName, clientID, username, password);
+ m_senders = new LinkedList();
+ m_numSessions = numSessions;
+ m_senderLock = new Object();
+ }
+
+ byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
+ throws Exception {
+ long timeoutTime = System.currentTimeMillis() + timeout;
+
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Unable to complete call in time allotted");
+ }
+
+ SendSession sendSession = null;
+
+ try {
+ sendSession = getSessionFromPool(m_poolTimeout);
+
+ byte[] response = sendSession.call(endpoint, message,
+ timeoutTime - System.currentTimeMillis(),
+ properties);
+
+ returnSessionToPool(sendSession);
+
+ if (response == null) {
+ throw new InvokeTimeoutException(
+ "Unable to complete call in time allotted");
+ }
+
+ return response;
+ } catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
+
+ // this we cannot recover from
+ // but it does not invalidate the session
+ returnSessionToPool(sendSession);
+
+ throw jmse;
+ }
+
+ // for now we will assume this is a reconnect related issue
+ // and let the sender be collected
+ // give the reconnect thread a chance to fill the pool
+ Thread.yield();
+
+ continue;
+ } catch (NullPointerException npe) {
+ Thread.yield();
+
+ continue;
+ }
+ }
+ }
+
+ protected abstract SendSession createSendSession(javax.jms.Connection connection)
+ throws JMSException;
+
+ protected void onConnect() throws JMSException {
+ synchronized (m_senderLock) {
+ for (int i = 0; i < m_numSessions; i++) {
+ m_senders.add(createSendSession(m_connection));
+ }
+
+ m_senderLock.notifyAll();
+ }
+ }
+
+ protected void onException() {
+ synchronized (m_senderLock) {
+ m_senders.clear();
+ }
+ }
+
+ protected void onShutdown() {
+ synchronized (m_senderLock) {
+ Iterator senders = m_senders.iterator();
+
+ while (senders.hasNext()) {
+ SendSession session = (SendSession) senders.next();
+
+ session.cleanup();
+ }
+
+ m_senders.clear();
+ }
+ }
+
+ private void returnSessionToPool(SendSession sendSession) {
+ synchronized (m_senderLock) {
+ m_senders.addLast(sendSession);
+ m_senderLock.notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
+ // TODO add in handling for security exceptions
+ // TODO add support for timeouts
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot complete send in time allotted");
+ }
+
+ SendSession sendSession = null;
+
+ try {
+ sendSession = getSessionFromPool(m_poolTimeout);
+ sendSession.send(endpoint, message, properties);
+ returnSessionToPool(sendSession);
+ } catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
+
+ // this we cannot recover from
+ // but it does not invalidate the session
+ returnSessionToPool(sendSession);
+
+ throw jmse;
+ }
+
+ // for now we will assume this is a reconnect related issue
+ // and let the sender be collected
+ // give the reconnect thread a chance to fill the pool
+ Thread.yield();
+
+ continue;
+ } catch (NullPointerException npe) {
+
+ // give the reconnect thread a chance to fill the pool
+ Thread.yield();
+
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ private SendSession getSessionFromPool(long timeout) {
+ synchronized (m_senderLock) {
+ while (m_senders.size() == 0) {
+ try {
+ m_senderLock.wait(timeout);
+
+ if (m_senders.size() == 0) {
+ return null;
+ }
+ } catch (InterruptedException ignore) {
+ return null;
+ }
+ }
+
+ return (SendSession) m_senders.removeFirst();
+ }
+ }
+
+ protected abstract class SendSession extends ConnectorSession {
+ MessageProducer m_producer;
+
+ SendSession(Session session, MessageProducer producer) throws JMSException {
+ super(session);
+ m_producer = producer;
+ }
+
+ byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
+ throws Exception {
+ Destination reply = createTemporaryDestination();
+ MessageConsumer subscriber = createConsumer(reply);
+ BytesMessage jmsMessage = m_session.createBytesMessage();
+
+ jmsMessage.writeBytes(message);
+ jmsMessage.setJMSReplyTo(reply);
+
+ int deliveryMode = extractDeliveryMode(properties);
+ int priority = extractPriority(properties);
+ long timeToLive = extractTimeToLive(properties);
+
+ if ((properties != null) && !properties.isEmpty()) {
+ setProperties(properties, jmsMessage);
+ }
+
+ send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
+ timeToLive);
+
+ BytesMessage response = null;
+
+ try {
+ response = (BytesMessage) subscriber.receive(timeout);
+ } catch (ClassCastException cce) {
+ throw new InvokeException(
+ "Error: unexpected message type received - expected BytesMessage");
+ }
+
+ byte[] respBytes = null;
+
+ if (response != null) {
+ byte[] buffer = new byte[8 * 1024];
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ for (int bytesRead = response.readBytes(buffer); bytesRead != -1;
+ bytesRead = response.readBytes(buffer)) {
+ out.write(buffer, 0, bytesRead);
+ }
+
+ respBytes = out.toByteArray();
+ }
+
+ subscriber.close();
+ deleteTemporaryDestination(reply);
+
+ return respBytes;
+ }
+
+ void cleanup() {
+ try {
+ m_producer.close();
+ } catch (Throwable t) {
+ }
+
+ try {
+ m_session.close();
+ } catch (Throwable t) {
+ }
+ }
+
+ protected abstract MessageConsumer createConsumer(Destination destination)
+ throws JMSException;
+
+ protected abstract Destination createTemporaryDestination() throws JMSException;
+
+ protected abstract void deleteTemporaryDestination(Destination destination)
+ throws JMSException;
+
+ private int extractDeliveryMode(HashMap properties) {
+ return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
+ JMSConstants.DEFAULT_DELIVERY_MODE);
+ }
+
+ private int extractPriority(HashMap properties) {
+ return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
+ JMSConstants.DEFAULT_PRIORITY);
+ }
+
+ private long extractTimeToLive(HashMap properties) {
+ return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
+ JMSConstants.DEFAULT_TIME_TO_LIVE);
+ }
+
+ void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
+ BytesMessage jmsMessage = m_session.createBytesMessage();
+
+ jmsMessage.writeBytes(message);
+
+ int deliveryMode = extractDeliveryMode(properties);
+ int priority = extractPriority(properties);
+ long timeToLive = extractTimeToLive(properties);
+
+ if ((properties != null) && !properties.isEmpty()) {
+ setProperties(properties, jmsMessage);
+ }
+
+ send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
+ timeToLive);
+ }
+
+ protected abstract void send(Destination destination, Message message,
+ int deliveryMode, int priority, long timeToLive)
+ throws JMSException;
+
+ private void setProperties(HashMap properties, Message message) throws JMSException {
+ Iterator propertyIter = properties.entrySet().iterator();
+
+ while (propertyIter.hasNext()) {
+ Map.Entry property = (Map.Entry) propertyIter.next();
+
+ setProperty((String) property.getKey(), property.getValue(), message);
+ }
+ }
+
+ private void setProperty(String property, Object value, Message message)
+ throws JMSException {
+ if (property == null) {
+ return;
+ }
+
+ if (property.equals(JMSConstants.JMS_CORRELATION_ID)) {
+ message.setJMSCorrelationID((String) value);
+ } else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES)) {
+ message.setJMSCorrelationIDAsBytes((byte[]) value);
+ } else if (property.equals(JMSConstants.JMS_TYPE)) {
+ message.setJMSType((String) value);
+ } else {
+ message.setObjectProperty(property, value);
+ }
+ }
+ }
+ }
+}
Modified: webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPElement.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPElement.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPElement.java (original)
+++ webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPElement.java Thu Jan 5 20:26:39 2006
@@ -30,7 +30,6 @@
/**
* @param parent
- * @param parent
*/
protected SOAPElement(OMElement parent,
String localName,
Modified: webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPFaultCodeImpl.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPFaultCodeImpl.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPFaultCodeImpl.java (original)
+++ webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/SOAPFaultCodeImpl.java Thu Jan 5 20:26:39 2006
@@ -35,8 +35,6 @@
/**
* Constructor OMElementImpl
*
- * @param localName
- * @param ns
* @param parent
* @param builder
*/
@@ -45,7 +43,6 @@
}
/**
- * @param parent
* @param parent
*/
public SOAPFaultCodeImpl(SOAPFault parent,
Modified: webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultCodeImpl.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultCodeImpl.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultCodeImpl.java (original)
+++ webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultCodeImpl.java Thu Jan 5 20:26:39 2006
@@ -34,8 +34,6 @@
/**
* Constructor OMElementImpl
*
- * @param localName
- * @param ns
* @param parent
* @param builder
*/
@@ -44,7 +42,6 @@
}
/**
- * @param parent
* @param parent
*/
public SOAP11FaultCodeImpl(SOAPFault parent) throws SOAPProcessingException {
Modified: webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultImpl.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultImpl.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultImpl.java (original)
+++ webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap11/SOAP11FaultImpl.java Thu Jan 5 20:26:39 2006
@@ -45,7 +45,6 @@
* This is a convenience method for the SOAP Fault Impl.
*
* @param parent
- * @param e
*/
public SOAP11FaultImpl(SOAPBody parent) throws SOAPProcessingException {
super(parent);
Modified: webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap12/SOAP12FaultCodeImpl.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap12/SOAP12FaultCodeImpl.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap12/SOAP12FaultCodeImpl.java (original)
+++ webservices/axis2/trunk/java/modules/doom/src/org/apache/axis2/soap/impl/dom/soap12/SOAP12FaultCodeImpl.java Thu Jan 5 20:26:39 2006
@@ -28,8 +28,6 @@
/**
* Constructor OMElementImpl
*
- * @param localName
- * @param ns
* @param parent
* @param builder
*/
@@ -38,7 +36,6 @@
}
/**
- * @param parent
* @param parent
*/
public SOAP12FaultCodeImpl(SOAPFault parent) throws SOAPProcessingException {
Modified: webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/handler/config/InflowConfiguration.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/handler/config/InflowConfiguration.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/handler/config/InflowConfiguration.java (original)
+++ webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/handler/config/InflowConfiguration.java Thu Jan 5 20:26:39 2006
@@ -38,7 +38,7 @@
/**
* Returns the configuration as an OMElement
- * @return
+ * @return Returns Parameter.
*/
public Parameter getProperty() {
OMFactory fac = OMAbstractFactory.getOMFactory();
@@ -71,8 +71,8 @@
}
/**
- * Returns the action items
- * @return
+ * Returns the action items.
+ * @return Returns String.
*/
public String getActionItems() {
return (String)this.action.get(WSSHandlerConstants.ACTION_ITEMS);
@@ -120,7 +120,7 @@
/**
* Returns the signature property file
- * @return
+ * @return Returns String.
*/
public String getSignaturePropFile() {
return (String)this.action.get(WSHandlerConstants.SIG_PROP_FILE);
@@ -136,7 +136,7 @@
/**
* Sets whether signature confirmation should be enabled or not
- * @param embeddedKeyName
+ * @param value
*/
public void setEnableSignatureConfirmation(boolean value) {
this.action.put(
@@ -145,7 +145,7 @@
/**
* Returns whether signature confirmation should be enabled or not
- * @return
+ * @return Returns String.
*/
public String getEnableSignatureConfirmation() {
return (String) this.action
Modified: webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/IssueRequestSecurityToken.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/IssueRequestSecurityToken.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/IssueRequestSecurityToken.java (original)
+++ webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/IssueRequestSecurityToken.java Thu Jan 5 20:26:39 2006
@@ -45,7 +45,7 @@
}
/**
- * Sets the value of the <code>AppliesTo</code> element
+ * Sets the value of the <code>AppliesTo</code> element.
* @param value
*/
public void setAppliesTo(OMElement value) {
@@ -58,8 +58,8 @@
}
/**
- * Returns the first child of the <code>AppliesTo</code> element
- * @return
+ * Returns the first child of the <code>AppliesTo</code> element.
+ * @return Returns OMElement.
*/
public OMElement getAppliesTo() {
if(this.appliesTo != null) {
@@ -115,8 +115,8 @@
}
/**
- * Retuns the <code>Claims</code> element
- * @return
+ * Returns the <code>Claims</code> element.
+ * @return Returns Claims.
*/
public Claims getClaims() {
return this.claims;
@@ -142,7 +142,7 @@
/**
* Sets the binary secret of the Entropy element when the its of type <code>Nonce</code>
* @see BinarySecret#NONCE_VAL
- * @param entropyValue The nonce value
+ * @param nonceValue The nonce value
*/
public void setEntropyNonce(String nonceValue) {
this.setEntropy(Constants.BINARY_SECRET_TYPE.NONCE_VAL, nonceValue);
@@ -150,7 +150,7 @@
/**
* Returns the <code>Entropy</code> element
- * @return
+ * @return Returns Entropy.
*/
public Entropy getEntropy() {
return this.entropy;
@@ -160,7 +160,7 @@
/**
* Adds a <code>wst:Lifetime</code> element with the given duration to the
* <code>wst:RequestSecurityToken</code>
- * @param duration
+ * @param lifetimeInMillis
*/
public void setLifetime(long lifetimeInMillis) {
if(this.lifetime != null) {
@@ -173,7 +173,7 @@
/**
* Retuns the <code>Lifetime</code> element
- * @return
+ * @return Returns Lifetime.
*/
public Lifetime getLifetime() {
return this.lifetime;
@@ -193,7 +193,7 @@
/**
* Retuns the <code>KeySize</code> element
- * @return
+ * @return Returns KeySize.
*/
public KeySize getKeySize() {
return this.keySize;
@@ -216,7 +216,7 @@
/**
* Returns the <code>Renewing</code> element
- * @return
+ * @return Returns Renewing.
*/
public Renewing getRenewing() {
return this.renewing;
Modified: webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/RequestSecurityToken.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/RequestSecurityToken.java?rev=366421&r1=366420&r2=366421&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/RequestSecurityToken.java (original)
+++ webservices/axis2/trunk/java/modules/security/src/org/apache/axis2/security/trust/token/RequestSecurityToken.java Thu Jan 5 20:26:39 2006
@@ -66,7 +66,7 @@
/**
* Returns the request type if it is set
- * @return
+ * @return Returns String.
*/
public String getRequestType() {
if(this.requestTypeElement != null) {
@@ -90,7 +90,7 @@
/**
* Returns the token type is set
- * @return
+ * @return Returns String.
*/
public String getTokenType() {
if(this.tokenTypeElement != null) {
@@ -125,8 +125,8 @@
/**
* Returns the requested token if available
- * @param token
- * @return
+ * @param tokenQName
+ * @return Returns OMElement.
*/
public OMElement getToken(QName tokenQName) {
return this.tokenElement.getFirstChildWithName(tokenQName);
@@ -165,7 +165,7 @@
* custom attrbutes added to the
* <code>wst:RequestSecyrityToken</code>
* @param attribute
- * @return
+ * @return Returns String.
*/
public String getAttributeValue(QName attribute) {
return this.tokenElement.getAttribute(attribute).getAttributeValue();