You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/05 22:38:38 UTC
svn commit: r354198 [1/3] -
/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/
Author: dims
Date: Mon Dec 5 13:38:30 2005
New Revision: 354198
URL: http://svn.apache.org/viewcvs?rev=354198&view=rev
Log:
Initial cut, compiles ok, have run a single thing yet.
Added:
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSEndpoint.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSSender.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSTransport.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSURLConnection.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSURLHelper.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSVendorAdapter.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSVendorAdapterFactory.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JNDIVendorAdapter.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/MapUtils.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java
webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.util.Loader;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnectionFactory;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.HashMap;
+
+/**
+ * Uses ClassUtils.forName and reflection to configure ConnectionFactory. Uses
+ * the input sessions to create destinations.
+ */
+public abstract class BeanVendorAdapter extends JMSVendorAdapter {
+ protected final static String CONNECTION_FACTORY_CLASS =
+ "transport.jms.ConnectionFactoryClass";
+
+ public QueueConnectionFactory getQueueConnectionFactory(HashMap cfConfig)
+ throws Exception {
+ return (QueueConnectionFactory) getConnectionFactory(cfConfig);
+ }
+
+ public TopicConnectionFactory getTopicConnectionFactory(HashMap cfConfig)
+ throws Exception {
+ return (TopicConnectionFactory) getConnectionFactory(cfConfig);
+ }
+
+ private ConnectionFactory getConnectionFactory(HashMap cfConfig)
+ throws Exception {
+ String classname = (String) cfConfig.get(CONNECTION_FACTORY_CLASS);
+ if (classname == null || classname.trim().length() == 0)
+ throw new IllegalArgumentException("noCFClass");
+
+ Class factoryClass = Loader.loadClass(classname);
+ ConnectionFactory factory = (ConnectionFactory) factoryClass.newInstance();
+ callSetters(cfConfig, factoryClass, factory);
+ return factory;
+ }
+
+ private void callSetters(HashMap cfConfig,
+ Class factoryClass,
+ ConnectionFactory factory)
+ throws Exception {
+ PropertyDescriptor[] bpd = Introspector.getBeanInfo(factoryClass).getPropertyDescriptors();
+ for (int i = 0; i < bpd.length; i++) {
+ PropertyDescriptor thisBPD = bpd[i];
+ String propName = thisBPD.getName();
+ if (cfConfig.containsKey(propName)) {
+ Object value = cfConfig.get(propName);
+ if (value == null)
+ continue;
+
+ String validType = thisBPD.getName();
+ if (!value.getClass().getName().equals(validType))
+ throw new IllegalArgumentException("badType");
+ if (thisBPD.getWriteMethod() == null)
+ throw new IllegalArgumentException("notWriteable");
+ thisBPD.getWriteMethod().invoke(factory, new Object[]{value});
+ }
+ }
+ }
+}
\ No newline at end of file
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import java.net.URL;
+import java.net.URLConnection;
+
+/**
+ * URLStreamHandler for the "jms" protocol
+ */
+public class Handler
+ extends java.net.URLStreamHandler {
+ static {
+ // register the JMSTransport class
+ //org.apache.axis.client.Call.setTransportForProtocol(JMSConstants.PROTOCOL, org.apache.axis2.transport.jms.JMSTransport.class);
+ }
+
+ /**
+ * Reassembles the URL string, in the form "jms:/<dest>?prop1=value1&prop2=value2&..."
+ */
+ protected String toExternalForm(URL url) {
+
+ String destination = url.getPath().substring(1);
+ String query = url.getQuery();
+
+ StringBuffer jmsurl = new StringBuffer(JMSConstants.PROTOCOL + ":/");
+ jmsurl.append(destination).append("?").append(query);
+
+ return jmsurl.toString();
+ }
+
+ protected URLConnection openConnection(URL url) {
+ return new JMSURLConnection(url);
+ }
+}
\ No newline at end of file
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+/**
+ * The <code>InvokeException</code> is thrown when a method encounters a
+ * general exception in the course of processing.
+ */
+public class InvokeException extends RuntimeException {
+ public InvokeException(String message) {
+ super(message);
+ }
+}
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+/**
+ * The <code>InvokeTimeoutException</code> is thrown when a method cannot
+ * complete processing within the time allotted. This occurs most often
+ * when the broker has failed and the client is unable to reconnect. This
+ * may be thrown from any method within the wsclient that interacts with the
+ * broker. The timeout is defined within the environment parameter to
+ * <code>createConnector</code> method in <code>JMSConnectorFactory</code>
+ * The key in the table is <code>JMSConstants.INTERACT_TIMEOUT_TIME</code>
+ */
+public class InvokeTimeoutException extends InvokeException {
+ public InvokeTimeoutException(String message) {
+ super(message);
+ }
+}
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,883 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * JMSConnector is an abstract class that encapsulates the work of connecting
+ * to JMS destinations. Its subclasses are TopicConnector and QueueConnector
+ * which further specialize connections to the pub-sub and the ptp domains.
+ * It also implements the capability to retry connections in the event of
+ * failures.
+ */
+public abstract class JMSConnector {
+ protected int m_numRetries;
+ protected long m_connectRetryInterval;
+ protected long m_interactRetryInterval;
+ protected long m_timeoutTime;
+ protected long m_poolTimeout;
+ protected AsyncConnection m_receiveConnection;
+ protected SyncConnection m_sendConnection;
+ protected int m_numSessions;
+ protected boolean m_allowReceive;
+ protected JMSVendorAdapter m_adapter;
+ protected JMSURLHelper m_jmsurl;
+
+ public JMSConnector(ConnectionFactory connectionFactory,
+ int numRetries,
+ int numSessions,
+ long connectRetryInterval,
+ long interactRetryInterval,
+ long timeoutTime,
+ boolean allowReceive,
+ String clientID,
+ String username,
+ String password,
+ JMSVendorAdapter adapter,
+ JMSURLHelper jmsurl)
+ throws JMSException {
+ m_numRetries = numRetries;
+ m_connectRetryInterval = connectRetryInterval;
+ m_interactRetryInterval = interactRetryInterval;
+ m_timeoutTime = timeoutTime;
+ m_poolTimeout = timeoutTime / (long) numRetries;
+ m_numSessions = numSessions;
+ m_allowReceive = allowReceive;
+ m_adapter = adapter;
+ m_jmsurl = jmsurl;
+
+ // try to connect initially so we can fail fast
+ // in the case of irrecoverable errors.
+ // If we fail in a recoverable fashion we will retry
+ javax.jms.Connection sendConnection = createConnectionWithRetry(
+ connectionFactory,
+ username,
+ password);
+ m_sendConnection = createSyncConnection(connectionFactory, sendConnection,
+ m_numSessions, "SendThread",
+ clientID,
+ username,
+ password);
+
+ m_sendConnection.start();
+
+ if (m_allowReceive) {
+ javax.jms.Connection receiveConnection = createConnectionWithRetry(
+ connectionFactory,
+ username,
+ password);
+ m_receiveConnection = createAsyncConnection(connectionFactory,
+ receiveConnection,
+ "ReceiveThread",
+ clientID,
+ username,
+ password);
+ m_receiveConnection.start();
+ }
+ }
+
+ public int getNumRetries() {
+ return m_numRetries;
+ }
+
+ public int numSessions() {
+ return m_numSessions;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ // there is always a send connection
+ return getSendConnection().getConnectionFactory();
+ }
+
+ public String getClientID() {
+ return getSendConnection().getClientID();
+ }
+
+ public String getUsername() {
+ return getSendConnection().getUsername();
+ }
+
+ public String getPassword() {
+ return getSendConnection().getPassword();
+ }
+
+ public JMSVendorAdapter getVendorAdapter() {
+ return m_adapter;
+ }
+
+ public JMSURLHelper getJMSURL() {
+ return m_jmsurl;
+ }
+
+ protected javax.jms.Connection createConnectionWithRetry(
+ ConnectionFactory connectionFactory,
+ String username,
+ String password)
+ throws JMSException {
+ javax.jms.Connection connection = null;
+ for (int numTries = 1; connection == null; numTries++) {
+ try {
+ connection = internalConnect(connectionFactory, username, password);
+ }
+ catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries)
+ throw jmse;
+ else
+ try {
+ Thread.sleep(m_connectRetryInterval);
+ } catch (InterruptedException ie) {
+ }
+ ;
+ }
+ }
+ return connection;
+ }
+
+ public void stop() {
+ JMSConnectorManager.getInstance().removeConnectorFromPool(this);
+
+ m_sendConnection.stopConnection();
+ if (m_allowReceive)
+ m_receiveConnection.stopConnection();
+ }
+
+ public void start() {
+ m_sendConnection.startConnection();
+ if (m_allowReceive)
+ m_receiveConnection.startConnection();
+
+ JMSConnectorManager.getInstance().addConnectorToPool(this);
+ }
+
+ public void shutdown() {
+ m_sendConnection.shutdown();
+ if (m_allowReceive)
+ m_receiveConnection.shutdown();
+ }
+
+ public abstract JMSEndpoint createEndpoint(String destinationName)
+ throws JMSException;
+
+ public abstract JMSEndpoint createEndpoint(Destination destination)
+ throws JMSException;
+
+
+ protected abstract javax.jms.Connection internalConnect(
+ ConnectionFactory connectionFactory,
+ String username,
+ String password)
+ throws JMSException;
+
+ private abstract class Connection extends Thread implements ExceptionListener {
+ private ConnectionFactory m_connectionFactory;
+ protected javax.jms.Connection m_connection;
+
+ protected boolean m_isActive;
+ private boolean m_needsToConnect;
+ private boolean m_startConnection;
+ private String m_clientID;
+ private String m_username;
+ private String m_password;
+
+ private Object m_jmsLock;
+ private Object m_lifecycleLock;
+
+ protected Connection(ConnectionFactory connectionFactory,
+ javax.jms.Connection connection,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+ throws JMSException {
+ super(threadName);
+ m_connectionFactory = connectionFactory;
+
+ m_clientID = clientID;
+ m_username = username;
+ m_password = password;
+
+ m_jmsLock = new Object();
+ m_lifecycleLock = new Object();
+
+ if (connection != null) {
+ m_needsToConnect = false;
+ m_connection = connection;
+ m_connection.setExceptionListener(this);
+ if (m_clientID != null)
+ m_connection.setClientID(m_clientID);
+ } else {
+ m_needsToConnect = true;
+ }
+
+ m_isActive = true;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return m_connectionFactory;
+ }
+
+ public String getClientID() {
+ return m_clientID;
+ }
+
+ public String getUsername() {
+ return m_username;
+ }
+
+ public String getPassword() {
+ return m_password;
+ }
+
+ /**
+ * @todo handle non-recoverable errors
+ */
+
+ public void run() {
+ // loop until a connection is made and when a connection is made (re)establish
+ // any subscriptions
+ while (m_isActive) {
+ if (m_needsToConnect) {
+ m_connection = null;
+ try {
+ m_connection = internalConnect(m_connectionFactory,
+ m_username, m_password);
+ m_connection.setExceptionListener(this);
+ if (m_clientID != null)
+ m_connection.setClientID(m_clientID);
+ }
+ catch (JMSException e) {
+ // simply backoff for a while and then retry
+ try {
+ Thread.sleep(m_connectRetryInterval);
+ } catch (InterruptedException ie) {
+ }
+ continue;
+ }
+ } else
+ m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
+ // we lost the connection
+
+ // we now have a valid connection so establish some context
+ try {
+ internalOnConnect();
+ }
+ catch (Exception e) {
+ // insert code to handle non recoverable errors
+ // simply retry
+ continue;
+ }
+
+ synchronized (m_jmsLock) {
+ try {
+ m_jmsLock.wait();
+ } catch (InterruptedException ie) {
+ } // until notified due to some change in status
+ }
+ }
+
+ // no longer staying connected, so see what we can cleanup
+ internalOnShutdown();
+ }
+
+
+ void startConnection() {
+ synchronized (m_lifecycleLock) {
+ if (m_startConnection)
+ return;
+ m_startConnection = true;
+ try {
+ m_connection.start();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+
+ void stopConnection() {
+ synchronized (m_lifecycleLock) {
+ if (!m_startConnection)
+ return;
+ m_startConnection = false;
+ try {
+ m_connection.stop();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+
+ void shutdown() {
+ m_isActive = false;
+ synchronized (m_jmsLock) {
+ m_jmsLock.notifyAll();
+ }
+ }
+
+
+ public void onException(JMSException exception) {
+ if (m_adapter.isRecoverable(exception,
+ JMSVendorAdapter.ON_EXCEPTION_ACTION))
+ return;
+ onException();
+ synchronized (m_jmsLock) {
+ m_jmsLock.notifyAll();
+ }
+ }
+
+ private final void internalOnConnect()
+ throws Exception {
+ onConnect();
+ synchronized (m_lifecycleLock) {
+ if (m_startConnection) {
+ try {
+ m_connection.start();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+ }
+
+ private final void internalOnShutdown() {
+ stopConnection();
+ onShutdown();
+ try {
+ m_connection.close();
+ } catch (Throwable e) {
+ } // ignore
+ }
+
+ protected abstract void onConnect() throws Exception;
+
+ protected abstract void onShutdown();
+
+ protected abstract void onException();
+ }
+
+ protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection,
+ int numSessions,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+
+ throws JMSException;
+
+ SyncConnection getSendConnection() {
+ return m_sendConnection;
+ }
+
+ protected abstract class SyncConnection extends Connection {
+ LinkedList m_senders;
+ int m_numSessions;
+ Object m_senderLock;
+
+ SyncConnection(ConnectionFactory connectionFactory,
+ javax.jms.Connection connection,
+ int numSessions,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+ throws JMSException {
+ super(connectionFactory, connection, threadName,
+ clientID, username, password);
+ m_senders = new LinkedList();
+ m_numSessions = numSessions;
+ m_senderLock = new Object();
+ }
+
+ protected abstract SendSession createSendSession(javax.jms.Connection connection)
+ throws JMSException;
+
+ protected void onConnect()
+ throws JMSException {
+ synchronized (m_senderLock) {
+ for (int i = 0; i < m_numSessions; i++) {
+ m_senders.add(createSendSession(m_connection));
+ }
+ m_senderLock.notifyAll();
+ }
+ }
+
+ byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
+ throws Exception {
+ long timeoutTime = System.currentTimeMillis() + timeout;
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Unable to complete call in time allotted");
+ }
+
+ SendSession sendSession = null;
+ try {
+ sendSession = getSessionFromPool(m_poolTimeout);
+ byte[] response = sendSession.call(endpoint,
+ message,
+ timeoutTime - System.currentTimeMillis(),
+ properties);
+ returnSessionToPool(sendSession);
+ if (response == null) {
+ throw new InvokeTimeoutException("Unable to complete call in time allotted");
+ }
+ return response;
+ }
+ catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
+ //this we cannot recover from
+ //but it does not invalidate the session
+ returnSessionToPool(sendSession);
+ throw jmse;
+ }
+
+ //for now we will assume this is a reconnect related issue
+ //and let the sender be collected
+ //give the reconnect thread a chance to fill the pool
+ Thread.yield();
+ continue;
+ }
+ catch (NullPointerException npe) {
+ Thread.yield();
+ continue;
+ }
+ }
+ }
+
+ /**
+ * @todo add in handling for security exceptions
+ * @todo add support for timeouts
+ */
+ void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
+ throws Exception {
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot complete send in time allotted");
+ }
+
+ SendSession sendSession = null;
+ try {
+ sendSession = getSessionFromPool(m_poolTimeout);
+ sendSession.send(endpoint, message, properties);
+ returnSessionToPool(sendSession);
+ }
+ catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
+ //this we cannot recover from
+ //but it does not invalidate the session
+ returnSessionToPool(sendSession);
+ throw jmse;
+ }
+ //for now we will assume this is a reconnect related issue
+ //and let the sender be collected
+ //give the reconnect thread a chance to fill the pool
+ Thread.yield();
+ continue;
+ }
+ catch (NullPointerException npe) {
+ //give the reconnect thread a chance to fill the pool
+ Thread.yield();
+ continue;
+ }
+ break;
+ }
+ }
+
+ protected void onException() {
+ synchronized (m_senderLock) {
+ m_senders.clear();
+ }
+ }
+
+ protected void onShutdown() {
+ synchronized (m_senderLock) {
+ Iterator senders = m_senders.iterator();
+ while (senders.hasNext()) {
+ SendSession session = (SendSession) senders.next();
+ session.cleanup();
+ }
+ m_senders.clear();
+ }
+ }
+
+ private SendSession getSessionFromPool(long timeout) {
+ synchronized (m_senderLock) {
+ while (m_senders.size() == 0) {
+ try {
+ m_senderLock.wait(timeout);
+ if (m_senders.size() == 0) {
+ return null;
+ }
+ }
+ catch (InterruptedException ignore) {
+ return null;
+ }
+ }
+ return (SendSession) m_senders.removeFirst();
+ }
+ }
+
+ private void returnSessionToPool(SendSession sendSession) {
+ synchronized (m_senderLock) {
+ m_senders.addLast(sendSession);
+ m_senderLock.notifyAll();
+ }
+ }
+
+ protected abstract class SendSession extends ConnectorSession {
+ MessageProducer m_producer;
+
+ SendSession(Session session,
+ MessageProducer producer)
+ throws JMSException {
+ super(session);
+ m_producer = producer;
+ }
+
+ protected abstract Destination createTemporaryDestination()
+ throws JMSException;
+
+ protected abstract void deleteTemporaryDestination(Destination destination)
+ throws JMSException;
+
+ protected abstract MessageConsumer createConsumer(Destination destination)
+ throws JMSException;
+
+ protected abstract void send(Destination destination,
+ Message message,
+ int deliveryMode,
+ int priority,
+ long timeToLive)
+ throws JMSException;
+
+ void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
+ throws Exception {
+ BytesMessage jmsMessage = m_session.createBytesMessage();
+ jmsMessage.writeBytes(message);
+ int deliveryMode = extractDeliveryMode(properties);
+ int priority = extractPriority(properties);
+ long timeToLive = extractTimeToLive(properties);
+
+ if (properties != null && !properties.isEmpty())
+ setProperties(properties, jmsMessage);
+
+ send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
+ priority, timeToLive);
+ }
+
+
+ void cleanup() {
+ try {
+ m_producer.close();
+ } catch (Throwable t) {
+ }
+ try {
+ m_session.close();
+ } catch (Throwable t) {
+ }
+ }
+
+ byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,
+ HashMap properties)
+ throws Exception {
+ Destination reply = createTemporaryDestination();
+ MessageConsumer subscriber = createConsumer(reply);
+ BytesMessage jmsMessage = m_session.createBytesMessage();
+ jmsMessage.writeBytes(message);
+ jmsMessage.setJMSReplyTo(reply);
+
+ int deliveryMode = extractDeliveryMode(properties);
+ int priority = extractPriority(properties);
+ long timeToLive = extractTimeToLive(properties);
+
+ if (properties != null && !properties.isEmpty())
+ setProperties(properties, jmsMessage);
+
+ send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
+ priority, timeToLive);
+ BytesMessage response = null;
+ try {
+ response = (BytesMessage) subscriber.receive(timeout);
+ } catch (ClassCastException cce) {
+ throw new InvokeException
+ ("Error: unexpected message type received - expected BytesMessage");
+ }
+ byte[] respBytes = null;
+ if (response != null) {
+ byte[] buffer = new byte[8 * 1024];
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ for (int bytesRead = response.readBytes(buffer);
+ bytesRead != -1; bytesRead = response.readBytes(buffer)) {
+ out.write(buffer, 0, bytesRead);
+ }
+ respBytes = out.toByteArray();
+ }
+ subscriber.close();
+ deleteTemporaryDestination(reply);
+ return respBytes;
+ }
+
+ private int extractPriority(HashMap properties) {
+ return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
+ JMSConstants.DEFAULT_PRIORITY);
+ }
+
+ private int extractDeliveryMode(HashMap properties) {
+ return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
+ JMSConstants.DEFAULT_DELIVERY_MODE);
+ }
+
+ private long extractTimeToLive(HashMap properties) {
+ return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
+ JMSConstants.DEFAULT_TIME_TO_LIVE);
+ }
+
+ private void setProperties(HashMap properties, Message message)
+ throws JMSException {
+ Iterator propertyIter = properties.entrySet().iterator();
+ while (propertyIter.hasNext()) {
+ Map.Entry property = (Map.Entry) propertyIter.next();
+ setProperty((String) property.getKey(), property.getValue(),
+ message);
+ }
+ }
+
+ private void setProperty(String property, Object value, Message message)
+ throws JMSException {
+ if (property == null)
+ return;
+ if (property.equals(JMSConstants.JMS_CORRELATION_ID))
+ message.setJMSCorrelationID((String) value);
+ else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))
+ message.setJMSCorrelationIDAsBytes((byte[]) value);
+ else if (property.equals(JMSConstants.JMS_TYPE))
+ message.setJMSType((String) value);
+ else
+ message.setObjectProperty(property, value);
+ }
+ }
+ }
+
+ AsyncConnection getReceiveConnection() {
+ return m_receiveConnection;
+ }
+
+ protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+
+ throws JMSException;
+
+ protected abstract class AsyncConnection extends Connection {
+ HashMap m_subscriptions;
+ Object m_subscriptionLock;
+
+ protected AsyncConnection(ConnectionFactory connectionFactory,
+ javax.jms.Connection connection,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+ throws JMSException {
+ super(connectionFactory, connection, threadName,
+ clientID, username, password);
+ m_subscriptions = new HashMap();
+ m_subscriptionLock = new Object();
+ }
+
+ protected abstract ListenerSession createListenerSession(
+ javax.jms.Connection connection,
+ Subscription subscription)
+ throws Exception;
+
+ protected void onShutdown() {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
+ ListenerSession session = (ListenerSession)
+ m_subscriptions.get(subscription);
+ if (session != null) {
+ session.cleanup();
+ }
+
+ }
+ m_subscriptions.clear();
+ }
+ }
+
+ /**
+ * @param subscription
+ * @todo add in security exception propagation
+ */
+ void subscribe(Subscription subscription)
+ throws Exception {
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+ synchronized (m_subscriptionLock) {
+ if (m_subscriptions.containsKey(subscription))
+ return;
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot subscribe listener");
+ }
+
+ try {
+ ListenerSession session = createListenerSession(m_connection,
+ subscription);
+ m_subscriptions.put(subscription, session);
+ break;
+ }
+ catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
+ throw jmse;
+ }
+
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ }
+ catch (InterruptedException ignore) {
+ }
+ //give reconnect a chance
+ Thread.yield();
+ continue;
+ }
+ catch (NullPointerException jmse) {
+ //we ARE reconnecting
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ }
+ catch (InterruptedException ignore) {
+ }
+ //give reconnect a chance
+ Thread.yield();
+ continue;
+ }
+ }
+ }
+ }
+
+ void unsubscribe(Subscription subscription) {
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+ synchronized (m_subscriptionLock) {
+ if (!m_subscriptions.containsKey(subscription))
+ return;
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot unsubscribe listener");
+ }
+
+ //give reconnect a chance
+ Thread.yield();
+ try {
+ ListenerSession session = (ListenerSession)
+ m_subscriptions.get(subscription);
+ session.cleanup();
+ m_subscriptions.remove(subscription);
+ break;
+ }
+ catch (NullPointerException jmse) {
+ //we are reconnecting
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ }
+ catch (InterruptedException ignore) {
+ }
+ continue;
+ }
+ }
+ }
+ }
+
+ protected void onConnect()
+ throws Exception {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
+
+ if (m_subscriptions.get(subscription) == null) {
+ m_subscriptions.put(subscription,
+ createListenerSession(m_connection, subscription));
+ }
+ }
+ m_subscriptionLock.notifyAll();
+ }
+ }
+
+ protected void onException() {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
+ m_subscriptions.put(subscription, null);
+ }
+ }
+ }
+
+
+ protected class ListenerSession extends ConnectorSession {
+ protected MessageConsumer m_consumer;
+ protected Subscription m_subscription;
+
+ ListenerSession(Session session,
+ MessageConsumer consumer,
+ Subscription subscription)
+ throws Exception {
+ super(session);
+ m_subscription = subscription;
+ m_consumer = consumer;
+ Destination destination = subscription.m_endpoint.getDestination(m_session);
+ m_consumer.setMessageListener(subscription.m_listener);
+ }
+
+ void cleanup() {
+ try {
+ m_consumer.close();
+ } catch (Exception ignore) {
+ }
+ try {
+ m_session.close();
+ } catch (Exception ignore) {
+ }
+ }
+
+ }
+ }
+
+ private abstract class ConnectorSession {
+ Session m_session;
+
+ ConnectorSession(Session session)
+ throws JMSException {
+ m_session = session;
+ }
+ }
+}
\ No newline at end of file
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+
+/**
+ * JMSConnectorFactory is a factory class for creating JMSConnectors. It can
+ * create both client connectors and server connectors. A server connector
+ * is configured to allow asynchronous message receipt, while a client
+ * connector is not.
+ * <p/>
+ * JMSConnectorFactory can also be used to select an appropriately configured
+ * JMSConnector from an existing pool of connectors.
+ */
+public abstract class JMSConnectorFactory {
+ protected static Log log =
+ LogFactory.getLog(JMSConnectorFactory.class.getName());
+
+ /**
+ * Performs an initial check on the connector properties, and then defers
+ * to the vendor adapter for matching on the vendor-specific connection factory.
+ *
+ * @param connectors the list of potential matches
+ * @param connectorProps the set of properties to be used for matching the connector
+ * @param cfProps the set of properties to be used for matching the connection factory
+ * @param username the user requesting the connector
+ * @param password the password associated with the requesting user
+ * @param adapter the vendor adapter specified in the JMS URL
+ * @return a JMSConnector that matches the specified properties
+ */
+ public static JMSConnector matchConnector(java.util.Set connectors,
+ HashMap connectorProps,
+ HashMap cfProps,
+ String username,
+ String password,
+ JMSVendorAdapter adapter) {
+ java.util.Iterator iter = connectors.iterator();
+ while (iter.hasNext()) {
+ JMSConnector conn = (JMSConnector) iter.next();
+
+ // username
+ String connectorUsername = conn.getUsername();
+ if (!(((connectorUsername == null) && (username == null)) ||
+ ((connectorUsername != null) && (username != null) && (connectorUsername.equals(username)))))
+ continue;
+
+ // password
+ String connectorPassword = conn.getPassword();
+ if (!(((connectorPassword == null) && (password == null)) ||
+ ((connectorPassword != null) && (password != null) && (connectorPassword.equals(password)))))
+ continue;
+
+ // num retries
+ int connectorNumRetries = conn.getNumRetries();
+ String propertyNumRetries = (String) connectorProps.get(JMSConstants.NUM_RETRIES);
+ int numRetries = JMSConstants.DEFAULT_NUM_RETRIES;
+ if (propertyNumRetries != null)
+ numRetries = Integer.parseInt(propertyNumRetries);
+ if (connectorNumRetries != numRetries)
+ continue;
+
+ // client id
+ String connectorClientID = conn.getClientID();
+ String clientID = (String) connectorProps.get(JMSConstants.CLIENT_ID);
+ if (!(((connectorClientID == null) && (clientID == null))
+ ||
+ ((connectorClientID != null) && (clientID != null) && connectorClientID.equals(clientID))))
+ continue;
+
+ // domain
+ String connectorDomain = (conn instanceof QueueConnector) ? JMSConstants.DOMAIN_QUEUE : JMSConstants.DOMAIN_TOPIC;
+ String propertyDomain = (String) connectorProps.get(JMSConstants.DOMAIN);
+ String domain = JMSConstants.DOMAIN_DEFAULT;
+ if (propertyDomain != null)
+ domain = propertyDomain;
+ if (!(((connectorDomain == null) && (domain == null))
+ ||
+ ((connectorDomain != null) && (domain != null) && connectorDomain.equalsIgnoreCase(domain))))
+ continue;
+
+ // the connection factory must also match for the connector to be reused
+ JMSURLHelper jmsurl = conn.getJMSURL();
+ if (adapter.isMatchingConnectionFactory(conn.getConnectionFactory(), jmsurl, cfProps)) {
+ // attempt to reserve the connector
+ try {
+ JMSConnectorManager.getInstance().reserve(conn);
+
+ if (log.isDebugEnabled()) {
+ log.debug("JMSConnectorFactory: Found matching connector");
+ }
+ }
+ catch (Exception e) {
+ // ignore. the connector may be in the process of shutting down, so try the next element
+ continue;
+ }
+
+ return conn;
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("JMSConnectorFactory: No matching connectors found");
+ }
+
+ return null;
+ }
+
+ /**
+ * Static method to create a server connector. Server connectors can
+ * accept incoming requests.
+ *
+ * @param connectorConfig
+ * @param cfConfig
+ * @param username
+ * @param password
+ * @return
+ * @throws Exception
+ */
+ public static JMSConnector createServerConnector(HashMap connectorConfig,
+ HashMap cfConfig,
+ String username,
+ String password,
+ JMSVendorAdapter adapter)
+ throws Exception {
+ return createConnector(connectorConfig, cfConfig, true,
+ username, password, adapter);
+ }
+
+ /**
+ * Static method to create a client connector. Client connectors cannot
+ * accept incoming requests.
+ *
+ * @param connectorConfig
+ * @param cfConfig
+ * @param username
+ * @param password
+ * @return
+ * @throws Exception
+ */
+ public static JMSConnector createClientConnector(HashMap connectorConfig,
+ HashMap cfConfig,
+ String username,
+ String password,
+ JMSVendorAdapter adapter)
+ throws Exception {
+ return createConnector(connectorConfig, cfConfig, false,
+ username, password, adapter);
+ }
+
+ private static JMSConnector createConnector(HashMap connectorConfig,
+ HashMap cfConfig,
+ boolean allowReceive,
+ String username,
+ String password,
+ JMSVendorAdapter adapter)
+ throws Exception {
+ if (connectorConfig != null)
+ connectorConfig = (HashMap) connectorConfig.clone();
+ int numRetries = MapUtils.removeIntProperty(connectorConfig,
+ JMSConstants.NUM_RETRIES,
+ JMSConstants.DEFAULT_NUM_RETRIES);
+
+ int numSessions = MapUtils.removeIntProperty(connectorConfig,
+ JMSConstants.NUM_SESSIONS,
+ JMSConstants.DEFAULT_NUM_SESSIONS);
+
+ long connectRetryInterval = MapUtils.removeLongProperty(connectorConfig,
+ JMSConstants.CONNECT_RETRY_INTERVAL,
+ JMSConstants.DEFAULT_CONNECT_RETRY_INTERVAL);
+
+ long interactRetryInterval = MapUtils.removeLongProperty(connectorConfig,
+ JMSConstants.INTERACT_RETRY_INTERVAL,
+ JMSConstants.DEFAULT_INTERACT_RETRY_INTERVAL);
+
+ long timeoutTime = MapUtils.removeLongProperty(connectorConfig,
+ JMSConstants.TIMEOUT_TIME,
+ JMSConstants.DEFAULT_TIMEOUT_TIME);
+
+ String clientID = MapUtils.removeStringProperty(connectorConfig,
+ JMSConstants.CLIENT_ID,
+ null);
+ String domain = MapUtils.removeStringProperty(connectorConfig,
+ JMSConstants.DOMAIN,
+ JMSConstants.DOMAIN_DEFAULT);
+
+ // this will be set if the target endpoint address was set on the Axis call
+ JMSURLHelper jmsurl = (JMSURLHelper) connectorConfig.get(JMSConstants.JMS_URL);
+
+ if (cfConfig == null)
+ throw new IllegalArgumentException("noCfConfig");
+
+ if (domain.equals(JMSConstants.DOMAIN_QUEUE)) {
+ return new QueueConnector(adapter.getQueueConnectionFactory(cfConfig),
+ numRetries, numSessions, connectRetryInterval,
+ interactRetryInterval, timeoutTime,
+ allowReceive, clientID, username, password,
+ adapter, jmsurl);
+ } else // domain is Topic
+ {
+ return new TopicConnector(adapter.getTopicConnectionFactory(cfConfig),
+ numRetries, numSessions, connectRetryInterval,
+ interactRetryInterval, timeoutTime,
+ allowReceive, clientID, username, password,
+ adapter, jmsurl);
+ }
+ }
+}
\ No newline at end of file
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,408 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.i18n.Messages;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * JMSConnectorManager manages a pool of connectors and works with the
+ * vendor adapters to support the reuse of JMS connections.
+ */
+public class JMSConnectorManager {
+ protected static Log log =
+ LogFactory.getLog(JMSConnectorManager.class.getName());
+
+ private static JMSConnectorManager s_instance = new JMSConnectorManager();
+
+ private static HashMap vendorConnectorPools = new HashMap();
+ private int DEFAULT_WAIT_FOR_SHUTDOWN = 90000; // 1.5 minutes
+
+ private JMSConnectorManager() {
+ }
+
+ public static JMSConnectorManager getInstance() {
+ return s_instance;
+ }
+
+ /**
+ * Returns the pool of JMSConnectors for a particular vendor
+ */
+ public ShareableObjectPool getVendorPool(String vendorId) {
+ return (ShareableObjectPool) vendorConnectorPools.get(vendorId);
+ }
+
+ /**
+ * Retrieves a JMSConnector that satisfies the provided connector criteria
+ */
+ public JMSConnector getConnector(HashMap connectorProperties,
+ HashMap connectionFactoryProperties,
+ String username,
+ String password,
+ JMSVendorAdapter vendorAdapter)
+ throws AxisFault {
+ JMSConnector connector = null;
+
+ try {
+ // check for a vendor-specific pool, and create if necessary
+ ShareableObjectPool vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
+ if (vendorConnectors == null) {
+ synchronized (vendorConnectorPools) {
+ vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
+ if (vendorConnectors == null) {
+ vendorConnectors = new ShareableObjectPool();
+ vendorConnectorPools.put(vendorAdapter.getVendorId(), vendorConnectors);
+ }
+ }
+ }
+
+ // look for a matching JMSConnector among existing connectors
+ synchronized (vendorConnectors) {
+ try {
+
+ connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
+ connectorProperties,
+ connectionFactoryProperties,
+ username,
+ password,
+ vendorAdapter);
+ }
+ catch (Exception e) {
+ } // ignore. a new connector will be created if no match is found
+
+ if (connector == null) {
+ connector = JMSConnectorFactory.createClientConnector(connectorProperties,
+ connectionFactoryProperties,
+ username,
+ password,
+ vendorAdapter);
+ connector.start();
+ }
+ }
+ }
+ catch (Exception e) {
+ log.error(Messages.getMessage("cannotConnectError"), e);
+
+ if (e instanceof AxisFault)
+ throw (AxisFault) e;
+ throw new AxisFault("cannotConnect", e);
+ }
+
+ return connector;
+ }
+
+ /**
+ * Closes JMSConnectors in all pools
+ */
+ void closeAllConnectors() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: JMSConnectorManager::closeAllConnectors");
+ }
+
+ synchronized (vendorConnectorPools) {
+ Iterator iter = vendorConnectorPools.values().iterator();
+ while (iter.hasNext()) {
+ // close all connectors in the vendor pool
+ ShareableObjectPool pool = (ShareableObjectPool) iter.next();
+ synchronized (pool) {
+ java.util.Iterator connectors = pool.getElements().iterator();
+ while (connectors.hasNext()) {
+ JMSConnector conn = (JMSConnector) connectors.next();
+ try {
+ // shutdown automatically decrements the ref count of a connector before closing it
+ // call reserve() to simulate the checkout
+ reserve(conn);
+ closeConnector(conn);
+ }
+ catch (Exception e) {
+ } // ignore. the connector is already being deactivated
+ }
+ }
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: JMSConnectorManager::closeAllConnectors");
+ }
+ }
+
+ /**
+ * Closes JMS connectors that match the specified endpoint address
+ */
+ void closeMatchingJMSConnectors(HashMap connectorProps, HashMap cfProps,
+ String username, String password,
+ JMSVendorAdapter vendorAdapter) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: JMSConnectorManager::closeMatchingJMSConnectors");
+ }
+
+ try {
+ String vendorId = vendorAdapter.getVendorId();
+
+ // get the vendor-specific pool of connectors
+ ShareableObjectPool vendorConnectors = null;
+ synchronized (vendorConnectorPools) {
+ vendorConnectors = getVendorPool(vendorId);
+ }
+
+ // it's possible that there is no pool for that vendor
+ if (vendorConnectors == null)
+ return;
+
+ synchronized (vendorConnectors) {
+ // close any matched connectors
+ JMSConnector connector = null;
+ while ((vendorConnectors.size() > 0) &&
+ (connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
+ connectorProps,
+ cfProps,
+ username,
+ password,
+ vendorAdapter)) != null) {
+ closeConnector(connector);
+ }
+ }
+ }
+ catch (Exception e) {
+ log.warn(Messages.getMessage("failedJMSConnectorShutdown"), e);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: JMSConnectorManager::closeMatchingJMSConnectors");
+ }
+ }
+
+ private void closeConnector(JMSConnector conn) {
+ conn.stop();
+ conn.shutdown();
+ }
+
+ /**
+ * Adds a JMSConnector to the appropriate vendor pool
+ */
+ public void addConnectorToPool(JMSConnector conn) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: JMSConnectorManager::addConnectorToPool");
+ }
+
+ ShareableObjectPool vendorConnectors = null;
+ synchronized (vendorConnectorPools) {
+ String vendorId = conn.getVendorAdapter().getVendorId();
+ vendorConnectors = getVendorPool(vendorId);
+ // it's possible the pool does not yet exist (if, for example, the connector
+ // is created before invoking the call/JMSTransport, as is the case with
+ // SimpleJMSListener)
+ if (vendorConnectors == null) {
+ vendorConnectors = new ShareableObjectPool();
+ vendorConnectorPools.put(vendorId, vendorConnectors);
+ }
+ }
+
+ synchronized (vendorConnectors) {
+ vendorConnectors.addObject(conn);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: JMSConnectorManager::addConnectorToPool");
+ }
+ }
+
+ /**
+ * Removes a JMSConnector from the appropriate vendor pool
+ */
+ public void removeConnectorFromPool(JMSConnector conn) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: JMSConnectorManager::removeConnectorFromPool");
+ }
+
+ ShareableObjectPool vendorConnectors = null;
+ synchronized (vendorConnectorPools) {
+ vendorConnectors = getVendorPool(conn.getVendorAdapter().getVendorId());
+ }
+ if (vendorConnectors == null)
+ return;
+
+ synchronized (vendorConnectors) {
+ // first release, to decrement the ref count (it is automatically incremented when
+ // the connector is matched)
+ vendorConnectors.release(conn);
+ vendorConnectors.removeObject(conn);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: JMSConnectorManager::removeConnectorFromPool");
+ }
+ }
+
+ /**
+ * Performs a non-exclusive checkout of the JMSConnector
+ */
+ public void reserve(JMSConnector connector) throws Exception {
+ ShareableObjectPool pool = null;
+ synchronized (vendorConnectorPools) {
+ pool = getVendorPool(connector.getVendorAdapter().getVendorId());
+ }
+ if (pool != null)
+ pool.reserve(connector);
+ }
+
+ /**
+ * Performs a non-exclusive checkin of the JMSConnector
+ */
+ public void release(JMSConnector connector) {
+ ShareableObjectPool pool = null;
+ synchronized (vendorConnectorPools) {
+ pool = getVendorPool(connector.getVendorAdapter().getVendorId());
+ }
+ if (pool != null)
+ pool.release(connector);
+ }
+
+ /**
+ * A simple non-blocking pool impl for objects that can be shared.
+ * Only a ref count is necessary to prevent collisions at shutdown.
+ * Todo: max size, cleanup stale connections
+ */
+ public class ShareableObjectPool {
+ // maps object to ref count wrapper
+ private java.util.HashMap m_elements;
+
+ // holds objects which should no longer be leased (pending removal)
+ private java.util.HashMap m_expiring;
+
+ private int m_numElements = 0;
+
+ public ShareableObjectPool() {
+ m_elements = new java.util.HashMap();
+ m_expiring = new java.util.HashMap();
+ }
+
+ /**
+ * Adds the object to the pool, if not already added
+ */
+ public void addObject(Object obj) {
+ ReferenceCountedObject ref = new ReferenceCountedObject(obj);
+ synchronized (m_elements) {
+ if (!m_elements.containsKey(obj) && !m_expiring.containsKey(obj))
+ m_elements.put(obj, ref);
+ }
+ }
+
+ /**
+ * Removes the object from the pool. If the object is reserved,
+ * waits the specified time before forcibly removing
+ * Todo: check expirations with the next request instead of holding up the current request
+ */
+ public void removeObject(Object obj, long waitTime) {
+ ReferenceCountedObject ref = null;
+ synchronized (m_elements) {
+ ref = (ReferenceCountedObject) m_elements.get(obj);
+ if (ref == null)
+ return;
+
+ m_elements.remove(obj);
+
+ if (ref.count() == 0)
+ return;
+ else
+ // mark the object for expiration
+ m_expiring.put(obj, ref);
+ }
+
+ // connector is now marked for expiration. wait for the ref count to drop to zero
+ long expiration = System.currentTimeMillis() + waitTime;
+ while (ref.count() > 0) {
+ try {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e) {
+ } // ignore
+ if (System.currentTimeMillis() > expiration)
+ break;
+ }
+
+ // also clear from the expiring list
+ m_expiring.remove(obj);
+ }
+
+ public void removeObject(Object obj) {
+ removeObject(obj, DEFAULT_WAIT_FOR_SHUTDOWN);
+ }
+
+ /**
+ * Marks the connector as in use by incrementing the connector's reference count
+ */
+ public void reserve(Object obj) throws Exception {
+ synchronized (m_elements) {
+ if (m_expiring.containsKey(obj))
+ throw new Exception("resourceUnavailable");
+
+ ReferenceCountedObject ref = (ReferenceCountedObject) m_elements.get(obj);
+ ref.increment();
+ }
+ }
+
+ /**
+ * Decrements the connector's reference count
+ */
+ public void release(Object obj) {
+ synchronized (m_elements) {
+ ReferenceCountedObject ref = (ReferenceCountedObject) m_elements.get(obj);
+ ref.decrement();
+ }
+ }
+
+ public synchronized java.util.Set getElements() {
+ return m_elements.keySet();
+ }
+
+ public synchronized int size() {
+ return m_elements.size();
+ }
+
+ /**
+ * Wrapper to track the use count of an object
+ */
+ public class ReferenceCountedObject {
+ private Object m_object;
+ private int m_refCount;
+
+ public ReferenceCountedObject(Object obj) {
+ m_object = obj;
+ m_refCount = 0;
+ }
+
+ public synchronized void increment() {
+ m_refCount++;
+ }
+
+ public synchronized void decrement() {
+ if (m_refCount > 0)
+ m_refCount--;
+ }
+
+ public synchronized int count() {
+ return m_refCount;
+ }
+ }
+ }
+}
\ No newline at end of file
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.Session;
+
+/**
+ * JMSConstants contains constants that apply to all JMS providers.
+ * <p/>
+ * <code>JMSConstants</code> contains the constant definitions for
+ * interacting with the WSClient. The most important constants are the
+ * <code>HashMap</code> keys for use in the arguments to the
+ * <code>send, call, registerListener, unregisterListener</code> methods of
+ * <code>JMSEndpoint</code> and the <code>createConnector</code> method of
+ * <code>JMSConnectorFactory</code>.
+ */
+
+public interface JMSConstants {
+ public final static String PROTOCOL = "jms";
+
+ // abbreviated version of all constants (see below for description of each constant)
+ // the short name is used in the JMS URL. the full name is used in the Axis call.
+ final static String _WAIT_FOR_RESPONSE = "waitForResponse";
+ final static String _CLIENT_ID = "clientID";
+ final static String _VENDOR = "vendor";
+ final static String _DOMAIN = "domain";
+ final static String _JMS_CORRELATION_ID = "jmsCorrelationID";
+ final static String _JMS_CORRELATION_ID_AS_BYTES = "jmsCorrelationIDAsBytes";
+ final static String _JMS_TYPE = "jmsType";
+ final static String _TIME_TO_LIVE = "ttl";
+ final static String _PRIORITY = "priority";
+ final static String _DELIVERY_MODE = "deliveryMode";
+ final static String _MESSAGE_SELECTOR = "messageSelector";
+ final static String _ACKNOWLEDGE_MODE = "acknowledgeMode";
+ final static String _SUBSCRIPTION_NAME = "subscriptionName";
+ final static String _UNSUBSCRIBE = "unsubscribe";
+ final static String _NO_LOCAL = "noLocal";
+ final static String _NUM_RETRIES = "numRetries";
+ final static String _NUM_SESSIONS = "numSessions";
+ final static String _CONNECT_RETRY_INTERVAL = "connectRetryInterval";
+ final static String _INTERACT_RETRY_INTERVAL = "interactRetryInterval";
+ final static String _TIMEOUT_TIME = "timeoutTime";
+ final static String _MIN_TIMEOUT_TIME = "minTimeoutTime";
+ /**
+ * Defines a prefix added to each application-specific property in the
+ * JMS URL that should be added to the JMS Message when issued.
+ */
+ final static String _MSG_PROP_PREFIX = "msgProp.";
+
+ public static String JMS_PROPERTY_PREFIX = "transport.jms.";
+
+ /**
+ * This is used as a key in the Call properties telling the JMS transport
+ * to wait for a response from the service. The default value is true.
+ * If false is specified, the message will be delivered without specifying
+ * a ReplyTo. The client will always return null from invoke unless
+ * a client-side exception is thrown (similar to invokeOneWay in semantics)
+ * The value must be a <code>java.lang.Boolean</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String WAIT_FOR_RESPONSE = JMS_PROPERTY_PREFIX + _WAIT_FOR_RESPONSE;
+
+ /**
+ * <code>JMSConnectorFactory</code> parameter valid for either domain. This should
+ * be used as a key in the environment map passed into calls to
+ * <code>createConnector</code> in <code>JMSConnectorFactory</code>
+ * This is a required property for durable subscribers.
+ * The value must be a <code>java.lang.String</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String CLIENT_ID = JMS_PROPERTY_PREFIX + _CLIENT_ID;
+
+ // there is no short version
+ final static String DESTINATION = JMS_PROPERTY_PREFIX + "Destination";
+
+ final static String VENDOR = JMS_PROPERTY_PREFIX + _VENDOR;
+ public final static String JNDI_VENDOR_ID = "JNDI";
+
+ final static String DOMAIN = JMS_PROPERTY_PREFIX + _DOMAIN;
+
+ final static String DOMAIN_QUEUE = "QUEUE";
+ final static String DOMAIN_TOPIC = "TOPIC";
+ final static String DOMAIN_DEFAULT = DOMAIN_QUEUE;
+
+ /**
+ * Key for properties used in the <code>send</code> and <code>call</code>
+ * methods. It is valid for either domain.
+ * The value must be a <code>java.lang.String</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String JMS_CORRELATION_ID = JMS_PROPERTY_PREFIX + _JMS_CORRELATION_ID;
+ /**
+ * Key for properties used in the <code>send</code> and <code>call</code>
+ * methods. It is valid for either domain.
+ * The value must be a <code>byte[]</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String JMS_CORRELATION_ID_AS_BYTES = JMS_PROPERTY_PREFIX + _JMS_CORRELATION_ID_AS_BYTES;
+ /**
+ * Key for properties used in the <code>send</code> and <code>call</code>
+ * methods. It is valid for either domain.
+ * The value must be a <code>java.lang.String</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String JMS_TYPE = JMS_PROPERTY_PREFIX + _JMS_TYPE;
+ /**
+ * Key for properties used in the <code>send</code> and <code>call</code>
+ * methods. It is valid for either domain.
+ * The value must be a <code>java.lang.Long</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String TIME_TO_LIVE = JMS_PROPERTY_PREFIX + _TIME_TO_LIVE;
+ /**
+ * Key for properties used in the <code>send</code> and <code>call</code>
+ * methods. It is valid for either domain.
+ * The value must be a <code>java.lang.Integer</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String PRIORITY = JMS_PROPERTY_PREFIX + _PRIORITY;
+ /**
+ * Key for properties used in the <code>send</code> and <code>call</code>
+ * methods. It is valid for either domain.
+ * The value must be a <code>java.lang.Integer</code> equal to
+ * DeliveryMode.NON_PERSISTENT or DeliveryMode.PERSISTENT.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String DELIVERY_MODE = JMS_PROPERTY_PREFIX + _DELIVERY_MODE;
+
+ final static String DELIVERY_MODE_PERSISTENT = "Persistent";
+ final static String DELIVERY_MODE_NONPERSISTENT = "Nonpersistent";
+ final static String DELIVERY_MODE_DISCARDABLE = "Discardable";
+ final static int DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
+
+ final static int DEFAULT_PRIORITY = Message.DEFAULT_PRIORITY;
+ final static long DEFAULT_TIME_TO_LIVE = Message.DEFAULT_TIME_TO_LIVE;
+
+ /**
+ * Key for properties used in the <code>registerListener</code>
+ * method. It is valid for either domain.
+ * The value must be a <code>java.lang.String</code>.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String MESSAGE_SELECTOR = JMS_PROPERTY_PREFIX + _MESSAGE_SELECTOR;
+ /**
+ * Key for properties used in the <code>registerListener</code>
+ * method. It is valid for either domain.
+ * The value must be a <code>java.lang.Integer</code> that is one of
+ * Session.AUTO_ACKNOWLEDGE, Session.DUPS_OK_ACKNOWLEDGE,
+ * or Session.CLIENT_ACKNOWLEDGE.
+ * See the javax.jms javadoc for information on this property.
+ */
+ final static String ACKNOWLEDGE_MODE = JMS_PROPERTY_PREFIX + _ACKNOWLEDGE_MODE;
+
+ /**
+ * value for ACKNOWLEDGE_MODE if left unset. It is equal to
+ * Session.DUPS_OK_ACKNOWLEDGE.
+ */
+ final static int DEFAULT_ACKNOWLEDGE_MODE = Session.DUPS_OK_ACKNOWLEDGE;
+
+ /**
+ * Specifies the name of a durable subscription
+ * Key for properties used in the <code>registerListener</code>
+ * method. It is valid for the PubSub domain.
+ * The value must be a <code>java.lang.String</code>.
+ */
+ final static String SUBSCRIPTION_NAME = JMS_PROPERTY_PREFIX + _SUBSCRIPTION_NAME;
+ /**
+ * Key for properties used in the <code>registerListener</code>
+ * method. It is valid for the PubSub domain.
+ * Specifies that the durable subscription should be unsubscribed
+ * (deleted from the broker) when unregistered.
+ * The value must be a <code>java.lang.Boolean</code>.
+ */
+ final static String UNSUBSCRIBE = JMS_PROPERTY_PREFIX + _UNSUBSCRIBE;
+ /**
+ * Key for properties used in the <code>registerListener</code>
+ * method. It is valid for the PubSub domain.
+ * The value must be a <code>java.lang.Boolean</code>.
+ */
+ final static String NO_LOCAL = JMS_PROPERTY_PREFIX + _NO_LOCAL;
+
+ final static boolean DEFAULT_NO_LOCAL = false;
+ final static boolean DEFAULT_UNSUBSCRIBE = false;
+
+ /**
+ * Key for properties used in the <code>createConnector</code>
+ * method. It changes the behavior of the wsclient.
+ * The value must be a <code>java.lang.Integer</code>.
+ */
+ final static String NUM_RETRIES = JMS_PROPERTY_PREFIX + _NUM_RETRIES;
+ /**
+ * Key for properties used in the <code>createConnector</code>
+ * method. It changes the behavior of the wsclient.
+ * The value must be a <code>java.lang.Integer</code>.
+ */
+ final static String NUM_SESSIONS = JMS_PROPERTY_PREFIX + _NUM_SESSIONS;
+ /**
+ * Key for properties used in the <code>createConnector</code>
+ * method. It changes the behavior of the wsclient.
+ * The value must be a <code>java.lang.Long</code>.
+ */
+ final static String CONNECT_RETRY_INTERVAL = JMS_PROPERTY_PREFIX + _CONNECT_RETRY_INTERVAL;
+ /**
+ * Key for properties used in the <code>createConnector</code>
+ * method. It changes the behavior of the wsclient.
+ * The value must be a <code>java.lang.Long</code>.
+ */
+ final static String INTERACT_RETRY_INTERVAL = JMS_PROPERTY_PREFIX + _INTERACT_RETRY_INTERVAL;
+ /**
+ * Key for properties used in the <code>createConnector</code>
+ * method. It changes the behavior of the wsclient.
+ * The value must be a <code>java.lang.Long</code>.
+ */
+ final static String TIMEOUT_TIME = JMS_PROPERTY_PREFIX + _TIMEOUT_TIME;
+ /**
+ * Key for properties used in the <code>createConnector</code>
+ * method. It changes the behavior of the wsclient.
+ * The value must be a <code>java.lang.Long</code>.
+ */
+ final static String MIN_TIMEOUT_TIME = JMS_PROPERTY_PREFIX + _MIN_TIMEOUT_TIME;
+
+ final static int DEFAULT_NUM_RETRIES = 5;
+ final static int DEFAULT_NUM_SESSIONS = 5;
+
+ final static long DEFAULT_CONNECT_RETRY_INTERVAL = 2000;
+ final static long DEFAULT_TIMEOUT_TIME = 5000;
+ final static long DEFAULT_MIN_TIMEOUT_TIME = 1000;
+ final static long DEFAULT_INTERACT_RETRY_INTERVAL = 250;
+
+ // key used to store the JMS connector in the message context
+ final static String CONNECTOR = JMS_PROPERTY_PREFIX + "Connector";
+
+ // key used to store the JMS vendor adapter in the message context
+ final static String VENDOR_ADAPTER = JMS_PROPERTY_PREFIX + "VendorAdapter";
+
+ // key used to store the JMS URL string in the message context
+ final static String JMS_URL = JMS_PROPERTY_PREFIX + "EndpointAddress";
+
+ /**
+ * A property that carries a Map of application-specific properties to be
+ * added to the JMS messages when issued.
+ */
+ final static String JMS_APPLICATION_MSG_PROPS =
+ JMS_PROPERTY_PREFIX + "msgProps";
+
+ final static String ADAPTER_POSTFIX = "VendorAdapter";
+}
\ No newline at end of file