You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/16 18:18:08 UTC
svn commit: r357187 [19/25] - in
/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2: ./
addressing/ client/ client/async/ context/ deployment/ deployment/listener/
deployment/repository/util/ deployment/scheduler/ deployment/util/
descript...
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java Fri Dec 16 09:13:57 2005
@@ -1,31 +1,32 @@
/*
- * $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//httpclient/src/test/org/apache/commons/httpclient/server/TransparentProxyRequestHandler.java,v 1.7 2004/12/11 22:35:26 olegk Exp $
- * $Revision: 155418 $
- * $Date: 2005-02-26 08:01:52 -0500 (Sat, 26 Feb 2005) $
- *
- * ====================================================================
- *
- * Copyright 1999-2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
+* $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//httpclient/src/test/org/apache/commons/httpclient/server/TransparentProxyRequestHandler.java,v 1.7 2004/12/11 22:35:26 olegk Exp $
+* $Revision: 155418 $
+* $Date: 2005-02-26 08:01:52 -0500 (Sat, 26 Feb 2005) $
+*
+* ====================================================================
+*
+* Copyright 1999-2004 The Apache Software Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ====================================================================
+*
+* This software consists of voluntary contributions made by many
+* individuals on behalf of the Apache Software Foundation. For more
+* information on the Apache Software Foundation, please see
+* <http://www.apache.org/>.
+*
+*/
+
package org.apache.axis2.transport.http.server;
@@ -44,47 +45,74 @@
* other HTTP methods.
*/
public class TransparentProxyRequestHandler implements HttpRequestHandler {
+ private static Socket connect(final String host) throws IOException {
+ String hostname = null;
+ int port;
+ int i = host.indexOf(':');
+
+ if (i != -1) {
+ hostname = host.substring(0, i);
+
+ try {
+ port = Integer.parseInt(host.substring(i + 1));
+ } catch (NumberFormatException ex) {
+ throw new IOException("Invalid host address: " + host);
+ }
+ } else {
+ hostname = host;
+ port = 80;
+ }
+
+ return new Socket(hostname, port);
+ }
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.commons.httpclient.server.HttpRequestHandler#processRequest(org.apache.commons.httpclient.server.SimpleHttpServerConnection)
*/
- public boolean processRequest(
- final SimpleHttpServerConnection conn,
- final SimpleRequest request) throws IOException
- {
-
+ public boolean processRequest(final SimpleHttpServerConnection conn,
+ final SimpleRequest request)
+ throws IOException {
RequestLine line = request.getRequestLine();
HttpVersion ver = line.getHttpVersion();
String method = line.getMethod();
+
if (!"CONNECT".equalsIgnoreCase(method)) {
return false;
}
+
Socket targetSocket = null;
+
try {
targetSocket = connect(line.getUri());
} catch (IOException e) {
SimpleResponse response = new SimpleResponse();
+
response.setStatusLine(ver, HttpStatus.SC_NOT_FOUND);
response.setHeader(new Header("Server", "test proxy"));
response.setBodyString("Cannot connect to " + line.getUri());
conn.writeResponse(response);
+
return true;
}
+
SimpleResponse response = new SimpleResponse();
+
response.setHeader(new Header("Server", "test proxy"));
response.setStatusLine(ver, HttpStatus.SC_OK, "Connection established");
conn.writeResponse(response);
-
- SimpleHttpServerConnection target = new SimpleHttpServerConnection(targetSocket);
+
+ SimpleHttpServerConnection target = new SimpleHttpServerConnection(targetSocket);
+
pump(conn, target);
+
return true;
}
- private void pump(final SimpleHttpServerConnection source, final SimpleHttpServerConnection target)
- throws IOException {
-
+ private void pump(final SimpleHttpServerConnection source,
+ final SimpleHttpServerConnection target)
+ throws IOException {
source.setSocketTimeout(100);
target.setSocketTimeout(100);
@@ -92,29 +120,35 @@
OutputStream sourceOut = source.getOutputStream();
InputStream targetIn = target.getInputStream();
OutputStream targetOut = target.getOutputStream();
-
- byte[] tmp = new byte[1024];
+ byte[] tmp = new byte[1024];
int l;
- for (;;) {
- if (!source.isOpen() || !target.isOpen()) {
+
+ for (; ;) {
+ if (!source.isOpen() || !target.isOpen()) {
break;
}
+
try {
l = sourceIn.read(tmp);
+
if (l == -1) {
break;
}
+
targetOut.write(tmp, 0, l);
} catch (InterruptedIOException ignore) {
if (Thread.interrupted()) {
break;
}
}
+
try {
l = targetIn.read(tmp);
+
if (l == -1) {
break;
}
+
sourceOut.write(tmp, 0, l);
} catch (InterruptedIOException ignore) {
if (Thread.interrupted()) {
@@ -123,23 +157,4 @@
}
}
}
-
- private static Socket connect(final String host) throws IOException {
- String hostname = null;
- int port;
- int i = host.indexOf(':');
- if (i != -1) {
- hostname = host.substring(0, i);
- try {
- port = Integer.parseInt(host.substring(i + 1));
- } catch (NumberFormatException ex) {
- throw new IOException("Invalid host address: " + host);
- }
- } else {
- hostname = host;
- port = 80;
- }
- return new Socket(hostname, port);
- }
-
-}
\ No newline at end of file
+}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
@@ -30,51 +31,58 @@
* the input sessions to create destinations.
*/
public abstract class BeanVendorAdapter extends JMSVendorAdapter {
- protected final static String CONNECTION_FACTORY_CLASS =
- "transport.jms.ConnectionFactoryClass";
-
- public QueueConnectionFactory getQueueConnectionFactory(HashMap cfConfig)
- throws Exception {
- return (QueueConnectionFactory) getConnectionFactory(cfConfig);
- }
+ protected final static String CONNECTION_FACTORY_CLASS = "transport.jms.ConnectionFactoryClass";
- public TopicConnectionFactory getTopicConnectionFactory(HashMap cfConfig)
- throws Exception {
- return (TopicConnectionFactory) getConnectionFactory(cfConfig);
- }
-
- private ConnectionFactory getConnectionFactory(HashMap cfConfig)
- throws Exception {
- String classname = (String) cfConfig.get(CONNECTION_FACTORY_CLASS);
- if (classname == null || classname.trim().length() == 0)
- throw new IllegalArgumentException("noCFClass");
-
- Class factoryClass = Loader.loadClass(classname);
- ConnectionFactory factory = (ConnectionFactory) factoryClass.newInstance();
- callSetters(cfConfig, factoryClass, factory);
- return factory;
- }
-
- private void callSetters(HashMap cfConfig,
- Class factoryClass,
- ConnectionFactory factory)
+ private void callSetters(HashMap cfConfig, Class factoryClass, ConnectionFactory factory)
throws Exception {
PropertyDescriptor[] bpd = Introspector.getBeanInfo(factoryClass).getPropertyDescriptors();
+
for (int i = 0; i < bpd.length; i++) {
PropertyDescriptor thisBPD = bpd[i];
String propName = thisBPD.getName();
+
if (cfConfig.containsKey(propName)) {
Object value = cfConfig.get(propName);
- if (value == null)
+
+ if (value == null) {
continue;
+ }
String validType = thisBPD.getName();
- if (!value.getClass().getName().equals(validType))
+
+ if (!value.getClass().getName().equals(validType)) {
throw new IllegalArgumentException("badType");
- if (thisBPD.getWriteMethod() == null)
+ }
+
+ if (thisBPD.getWriteMethod() == null) {
throw new IllegalArgumentException("notWriteable");
+ }
+
thisBPD.getWriteMethod().invoke(factory, new Object[]{value});
}
}
}
-}
\ No newline at end of file
+
+ private ConnectionFactory getConnectionFactory(HashMap cfConfig) throws Exception {
+ String classname = (String) cfConfig.get(CONNECTION_FACTORY_CLASS);
+
+ if ((classname == null) || (classname.trim().length() == 0)) {
+ throw new IllegalArgumentException("noCFClass");
+ }
+
+ Class factoryClass = Loader.loadClass(classname);
+ ConnectionFactory factory = (ConnectionFactory) factoryClass.newInstance();
+
+ callSetters(cfConfig, factoryClass, factory);
+
+ return factory;
+ }
+
+ public QueueConnectionFactory getQueueConnectionFactory(HashMap cfConfig) throws Exception {
+ return (QueueConnectionFactory) getConnectionFactory(cfConfig);
+ }
+
+ public TopicConnectionFactory getTopicConnectionFactory(HashMap cfConfig) throws Exception {
+ return (TopicConnectionFactory) getConnectionFactory(cfConfig);
+ }
+}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
@@ -39,30 +40,22 @@
* failures.
*/
public abstract class JMSConnector {
- protected int m_numRetries;
+ protected JMSVendorAdapter m_adapter;
+ protected boolean m_allowReceive;
protected long m_connectRetryInterval;
protected long m_interactRetryInterval;
- protected long m_timeoutTime;
+ protected JMSURLHelper m_jmsurl;
+ protected int m_numRetries;
+ protected int m_numSessions;
protected long m_poolTimeout;
protected AsyncConnection m_receiveConnection;
protected SyncConnection m_sendConnection;
- protected int m_numSessions;
- protected boolean m_allowReceive;
- protected JMSVendorAdapter m_adapter;
- protected JMSURLHelper m_jmsurl;
+ protected long m_timeoutTime;
- public JMSConnector(ConnectionFactory connectionFactory,
- int numRetries,
- int numSessions,
- long connectRetryInterval,
- long interactRetryInterval,
- long timeoutTime,
- boolean allowReceive,
- String clientID,
- String username,
- String password,
- JMSVendorAdapter adapter,
- JMSURLHelper jmsurl)
+ public JMSConnector(ConnectionFactory connectionFactory, int numRetries, int numSessions,
+ long connectRetryInterval, long interactRetryInterval, long timeoutTime,
+ boolean allowReceive, String clientID, String username, String password,
+ JMSVendorAdapter adapter, JMSURLHelper jmsurl)
throws JMSException {
m_numRetries = numRetries;
m_connectRetryInterval = connectRetryInterval;
@@ -77,288 +70,368 @@
// try to connect initially so we can fail fast
// in the case of irrecoverable errors.
// If we fail in a recoverable fashion we will retry
- javax.jms.Connection sendConnection = createConnectionWithRetry(
- connectionFactory,
- username,
- password);
- m_sendConnection = createSyncConnection(connectionFactory, sendConnection,
- m_numSessions, "SendThread",
- clientID,
- username,
- password);
+ javax.jms.Connection sendConnection = createConnectionWithRetry(connectionFactory,
+ username, password);
+ m_sendConnection = createSyncConnection(connectionFactory, sendConnection, m_numSessions,
+ "SendThread", clientID, username, password);
m_sendConnection.start();
if (m_allowReceive) {
- javax.jms.Connection receiveConnection = createConnectionWithRetry(
- connectionFactory,
- username,
- password);
- m_receiveConnection = createAsyncConnection(connectionFactory,
- receiveConnection,
- "ReceiveThread",
- clientID,
- username,
- password);
+ javax.jms.Connection receiveConnection = createConnectionWithRetry(connectionFactory,
+ username, password);
+
+ m_receiveConnection = createAsyncConnection(connectionFactory, receiveConnection,
+ "ReceiveThread", clientID, username, password);
m_receiveConnection.start();
}
}
- public int getNumRetries() {
- return m_numRetries;
- }
-
- public int numSessions() {
- return m_numSessions;
- }
-
- public ConnectionFactory getConnectionFactory() {
- // there is always a send connection
- return getSendConnection().getConnectionFactory();
- }
-
- public String getClientID() {
- return getSendConnection().getClientID();
- }
-
- public String getUsername() {
- return getSendConnection().getUsername();
- }
-
- public String getPassword() {
- return getSendConnection().getPassword();
- }
-
- public JMSVendorAdapter getVendorAdapter() {
- return m_adapter;
- }
-
- public JMSURLHelper getJMSURL() {
- return m_jmsurl;
- }
+ protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection, String threadName, String clientID, String username,
+ String password)
+ throws JMSException;
- protected javax.jms.Connection createConnectionWithRetry(
- ConnectionFactory connectionFactory,
- String username,
- String password)
+ protected javax.jms.Connection createConnectionWithRetry(ConnectionFactory connectionFactory,
+ String username, String password)
throws JMSException {
javax.jms.Connection connection = null;
+
for (int numTries = 1; connection == null; numTries++) {
try {
connection = internalConnect(connectionFactory, username, password);
- }
- catch (JMSException jmse) {
- if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries)
+ } catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION)
+ || (numTries == m_numRetries)) {
throw jmse;
- else
+ } else {
try {
Thread.sleep(m_connectRetryInterval);
} catch (InterruptedException ie) {
}
+ }
;
}
}
+
return connection;
}
- public void stop() {
- JMSConnectorManager.getInstance().removeConnectorFromPool(this);
+ public abstract JMSEndpoint createEndpoint(Destination destination) throws JMSException;
- m_sendConnection.stopConnection();
- if (m_allowReceive)
- m_receiveConnection.stopConnection();
- }
+ public abstract JMSEndpoint createEndpoint(String destinationName) throws JMSException;
- public void start() {
- m_sendConnection.startConnection();
- if (m_allowReceive)
- m_receiveConnection.startConnection();
+ protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection, int numSessions, String threadName, String clientID,
+ String username, String password)
+ throws JMSException;
- JMSConnectorManager.getInstance().addConnectorToPool(this);
+ protected abstract javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+ String username, String password)
+ throws JMSException;
+
+ public int numSessions() {
+ return m_numSessions;
}
public void shutdown() {
m_sendConnection.shutdown();
- if (m_allowReceive)
+
+ if (m_allowReceive) {
m_receiveConnection.shutdown();
+ }
}
- public abstract JMSEndpoint createEndpoint(String destinationName)
- throws JMSException;
+ public void start() {
+ m_sendConnection.startConnection();
- public abstract JMSEndpoint createEndpoint(Destination destination)
- throws JMSException;
+ if (m_allowReceive) {
+ m_receiveConnection.startConnection();
+ }
+ JMSConnectorManager.getInstance().addConnectorToPool(this);
+ }
- protected abstract javax.jms.Connection internalConnect(
- ConnectionFactory connectionFactory,
- String username,
- String password)
- throws JMSException;
+ public void stop() {
+ JMSConnectorManager.getInstance().removeConnectorFromPool(this);
+ m_sendConnection.stopConnection();
- private abstract class Connection extends Thread implements ExceptionListener {
- private ConnectionFactory m_connectionFactory;
- protected javax.jms.Connection m_connection;
+ if (m_allowReceive) {
+ m_receiveConnection.stopConnection();
+ }
+ }
- protected boolean m_isActive;
- private boolean m_needsToConnect;
- private boolean m_startConnection;
- private String m_clientID;
- private String m_username;
- private String m_password;
+ public String getClientID() {
+ return getSendConnection().getClientID();
+ }
- private Object m_jmsLock;
- private Object m_lifecycleLock;
+ public ConnectionFactory getConnectionFactory() {
- protected Connection(ConnectionFactory connectionFactory,
- javax.jms.Connection connection,
- String threadName,
- String clientID,
- String username,
- String password)
- throws JMSException {
- super(threadName);
- m_connectionFactory = connectionFactory;
+ // there is always a send connection
+ return getSendConnection().getConnectionFactory();
+ }
- m_clientID = clientID;
- m_username = username;
- m_password = password;
+ public JMSURLHelper getJMSURL() {
+ return m_jmsurl;
+ }
- m_jmsLock = new Object();
- m_lifecycleLock = new Object();
+ public int getNumRetries() {
+ return m_numRetries;
+ }
- if (connection != null) {
- m_needsToConnect = false;
- m_connection = connection;
- m_connection.setExceptionListener(this);
- if (m_clientID != null)
- m_connection.setClientID(m_clientID);
- } else {
- m_needsToConnect = true;
- }
+ public String getPassword() {
+ return getSendConnection().getPassword();
+ }
- m_isActive = true;
- }
+ AsyncConnection getReceiveConnection() {
+ return m_receiveConnection;
+ }
- public ConnectionFactory getConnectionFactory() {
- return m_connectionFactory;
- }
+ SyncConnection getSendConnection() {
+ return m_sendConnection;
+ }
- public String getClientID() {
- return m_clientID;
- }
+ public String getUsername() {
+ return getSendConnection().getUsername();
+ }
- public String getUsername() {
- return m_username;
- }
+ public JMSVendorAdapter getVendorAdapter() {
+ return m_adapter;
+ }
- public String getPassword() {
- return m_password;
+ protected abstract class AsyncConnection extends Connection {
+ Object m_subscriptionLock;
+ HashMap m_subscriptions;
+
+ protected AsyncConnection(ConnectionFactory connectionFactory,
+ javax.jms.Connection connection, String threadName,
+ String clientID, String username, String password)
+ throws JMSException {
+ super(connectionFactory, connection, threadName, clientID, username, password);
+ m_subscriptions = new HashMap();
+ m_subscriptionLock = new Object();
}
- /**
- * @todo handle non-recoverable errors
- */
+ protected abstract ListenerSession createListenerSession(javax.jms.Connection connection,
+ Subscription subscription)
+ throws Exception;
- public void run() {
- // loop until a connection is made and when a connection is made (re)establish
- // any subscriptions
- while (m_isActive) {
- if (m_needsToConnect) {
- m_connection = null;
- try {
- m_connection = internalConnect(m_connectionFactory,
- m_username, m_password);
- m_connection.setExceptionListener(this);
- if (m_clientID != null)
- m_connection.setClientID(m_clientID);
- }
- catch (JMSException e) {
- // simply backoff for a while and then retry
- try {
- Thread.sleep(m_connectRetryInterval);
- } catch (InterruptedException ie) {
- }
- continue;
- }
- } else
- m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
- // we lost the connection
+ protected void onConnect() throws Exception {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
- // we now have a valid connection so establish some context
- try {
- internalOnConnect();
- }
- catch (Exception e) {
- // insert code to handle non recoverable errors
- // simply retry
- continue;
- }
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
- synchronized (m_jmsLock) {
- try {
- m_jmsLock.wait();
- } catch (InterruptedException ie) {
- } // until notified due to some change in status
+ if (m_subscriptions.get(subscription) == null) {
+ m_subscriptions.put(subscription,
+ createListenerSession(m_connection, subscription));
+ }
}
- }
- // no longer staying connected, so see what we can cleanup
- internalOnShutdown();
+ m_subscriptionLock.notifyAll();
+ }
}
+ protected void onException() {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
- void startConnection() {
- synchronized (m_lifecycleLock) {
- if (m_startConnection)
- return;
- m_startConnection = true;
- try {
- m_connection.start();
- } catch (Throwable e) {
- } // ignore
- }
- }
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
- void stopConnection() {
- synchronized (m_lifecycleLock) {
- if (!m_startConnection)
- return;
- m_startConnection = false;
- try {
- m_connection.stop();
- } catch (Throwable e) {
- } // ignore
+ m_subscriptions.put(subscription, null);
+ }
}
}
- void shutdown() {
- m_isActive = false;
- synchronized (m_jmsLock) {
- m_jmsLock.notifyAll();
+ protected void onShutdown() {
+ synchronized (m_subscriptionLock) {
+ Iterator subscriptions = m_subscriptions.keySet().iterator();
+
+ while (subscriptions.hasNext()) {
+ Subscription subscription = (Subscription) subscriptions.next();
+ ListenerSession session =
+ (ListenerSession) m_subscriptions.get(subscription);
+
+ if (session != null) {
+ session.cleanup();
+ }
+ }
+
+ m_subscriptions.clear();
}
}
-
- public void onException(JMSException exception) {
- if (m_adapter.isRecoverable(exception,
- JMSVendorAdapter.ON_EXCEPTION_ACTION))
- return;
- onException();
- synchronized (m_jmsLock) {
- m_jmsLock.notifyAll();
+ /**
+ * @param subscription
+ * @todo add in security exception propagation
+ */
+ void subscribe(Subscription subscription) throws Exception {
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
+ synchronized (m_subscriptionLock) {
+ if (m_subscriptions.containsKey(subscription)) {
+ return;
+ }
+
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot subscribe listener");
+ }
+
+ try {
+ ListenerSession session = createListenerSession(m_connection, subscription);
+
+ m_subscriptions.put(subscription, session);
+
+ break;
+ } catch (JMSException jmse) {
+ if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
+ throw jmse;
+ }
+
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ } catch (InterruptedException ignore) {
+ }
+
+ // give reconnect a chance
+ Thread.yield();
+
+ continue;
+ } catch (NullPointerException jmse) {
+
+ // we ARE reconnecting
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ } catch (InterruptedException ignore) {
+ }
+
+ // give reconnect a chance
+ Thread.yield();
+
+ continue;
+ }
+ }
}
}
- private final void internalOnConnect()
- throws Exception {
+ void unsubscribe(Subscription subscription) {
+ long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
+ synchronized (m_subscriptionLock) {
+ if (!m_subscriptions.containsKey(subscription)) {
+ return;
+ }
+
+ while (true) {
+ if (System.currentTimeMillis() > timeoutTime) {
+ throw new InvokeTimeoutException("Cannot unsubscribe listener");
+ }
+
+ // give reconnect a chance
+ Thread.yield();
+
+ try {
+ ListenerSession session =
+ (ListenerSession) m_subscriptions.get(subscription);
+
+ session.cleanup();
+ m_subscriptions.remove(subscription);
+
+ break;
+ } catch (NullPointerException jmse) {
+
+ // we are reconnecting
+ try {
+ m_subscriptionLock.wait(m_interactRetryInterval);
+ } catch (InterruptedException ignore) {
+ }
+
+ continue;
+ }
+ }
+ }
+ }
+
+ protected class ListenerSession extends ConnectorSession {
+ protected MessageConsumer m_consumer;
+ protected Subscription m_subscription;
+
+ ListenerSession(Session session, MessageConsumer consumer, Subscription subscription)
+ throws Exception {
+ super(session);
+ m_subscription = subscription;
+ m_consumer = consumer;
+
+ Destination destination = subscription.m_endpoint.getDestination(m_session);
+
+ m_consumer.setMessageListener(subscription.m_listener);
+ }
+
+ void cleanup() {
+ try {
+ m_consumer.close();
+ } catch (Exception ignore) {
+ }
+
+ try {
+ m_session.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+
+
+ private abstract class Connection extends Thread implements ExceptionListener {
+ private String m_clientID;
+ protected javax.jms.Connection m_connection;
+ private ConnectionFactory m_connectionFactory;
+ protected boolean m_isActive;
+ private Object m_jmsLock;
+ private Object m_lifecycleLock;
+ private boolean m_needsToConnect;
+ private String m_password;
+ private boolean m_startConnection;
+ private String m_username;
+
+ protected Connection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
+ String threadName, String clientID, String username, String password)
+ throws JMSException {
+ super(threadName);
+ m_connectionFactory = connectionFactory;
+ m_clientID = clientID;
+ m_username = username;
+ m_password = password;
+ m_jmsLock = new Object();
+ m_lifecycleLock = new Object();
+
+ if (connection != null) {
+ m_needsToConnect = false;
+ m_connection = connection;
+ m_connection.setExceptionListener(this);
+
+ if (m_clientID != null) {
+ m_connection.setClientID(m_clientID);
+ }
+ } else {
+ m_needsToConnect = true;
+ }
+
+ m_isActive = true;
+ }
+
+ private final void internalOnConnect() throws Exception {
onConnect();
+
synchronized (m_lifecycleLock) {
if (m_startConnection) {
try {
m_connection.start();
} catch (Throwable e) {
- } // ignore
+ } // ignore
}
}
}
@@ -366,162 +439,300 @@
private final void internalOnShutdown() {
stopConnection();
onShutdown();
+
try {
m_connection.close();
} catch (Throwable e) {
- } // ignore
+ } // ignore
}
protected abstract void onConnect() throws Exception;
+ protected abstract void onException();
+
+ public void onException(JMSException exception) {
+ if (m_adapter.isRecoverable(exception, JMSVendorAdapter.ON_EXCEPTION_ACTION)) {
+ return;
+ }
+
+ onException();
+
+ synchronized (m_jmsLock) {
+ m_jmsLock.notifyAll();
+ }
+ }
+
protected abstract void onShutdown();
- protected abstract void onException();
+ /**
+ * @todo handle non-recoverable errors
+ */
+ public void run() {
+
+ // loop until a connection is made and when a connection is made (re)establish
+ // any subscriptions
+ while (m_isActive) {
+ if (m_needsToConnect) {
+ m_connection = null;
+
+ try {
+ m_connection = internalConnect(m_connectionFactory, m_username, m_password);
+ m_connection.setExceptionListener(this);
+
+ if (m_clientID != null) {
+ m_connection.setClientID(m_clientID);
+ }
+ } catch (JMSException e) {
+
+ // simply backoff for a while and then retry
+ try {
+ Thread.sleep(m_connectRetryInterval);
+ } catch (InterruptedException ie) {
+ }
+
+ continue;
+ }
+ } else {
+ m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
+ }
+
+ // we lost the connection
+
+ // we now have a valid connection so establish some context
+ try {
+ internalOnConnect();
+ } catch (Exception e) {
+
+ // insert code to handle non recoverable errors
+ // simply retry
+ continue;
+ }
+
+ synchronized (m_jmsLock) {
+ try {
+ m_jmsLock.wait();
+ } catch (InterruptedException ie) {
+ } // until notified due to some change in status
+ }
+ }
+
+ // no longer staying connected, so see what we can cleanup
+ internalOnShutdown();
+ }
+
+ void shutdown() {
+ m_isActive = false;
+
+ synchronized (m_jmsLock) {
+ m_jmsLock.notifyAll();
+ }
+ }
+
+ void startConnection() {
+ synchronized (m_lifecycleLock) {
+ if (m_startConnection) {
+ return;
+ }
+
+ m_startConnection = true;
+
+ try {
+ m_connection.start();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+
+ void stopConnection() {
+ synchronized (m_lifecycleLock) {
+ if (!m_startConnection) {
+ return;
+ }
+
+ m_startConnection = false;
+
+ try {
+ m_connection.stop();
+ } catch (Throwable e) {
+ } // ignore
+ }
+ }
+
+ public String getClientID() {
+ return m_clientID;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return m_connectionFactory;
+ }
+
+ public String getPassword() {
+ return m_password;
+ }
+
+ public String getUsername() {
+ return m_username;
+ }
}
- protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection,
- int numSessions,
- String threadName,
- String clientID,
- String username,
- String password)
- throws JMSException;
+ private abstract class ConnectorSession {
+ Session m_session;
- SyncConnection getSendConnection() {
- return m_sendConnection;
+ ConnectorSession(Session session) throws JMSException {
+ m_session = session;
+ }
}
+
protected abstract class SyncConnection extends Connection {
- LinkedList m_senders;
int m_numSessions;
Object m_senderLock;
+ LinkedList m_senders;
- SyncConnection(ConnectionFactory connectionFactory,
- javax.jms.Connection connection,
- int numSessions,
- String threadName,
- String clientID,
- String username,
+ SyncConnection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
+ int numSessions, String threadName, String clientID, String username,
String password)
throws JMSException {
- super(connectionFactory, connection, threadName,
- clientID, username, password);
+ super(connectionFactory, connection, threadName, clientID, username, password);
m_senders = new LinkedList();
m_numSessions = numSessions;
m_senderLock = new Object();
}
- protected abstract SendSession createSendSession(javax.jms.Connection connection)
- throws JMSException;
-
- protected void onConnect()
- throws JMSException {
- synchronized (m_senderLock) {
- for (int i = 0; i < m_numSessions; i++) {
- m_senders.add(createSendSession(m_connection));
- }
- m_senderLock.notifyAll();
- }
- }
-
byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
throws Exception {
long timeoutTime = System.currentTimeMillis() + timeout;
+
while (true) {
if (System.currentTimeMillis() > timeoutTime) {
throw new InvokeTimeoutException("Unable to complete call in time allotted");
}
SendSession sendSession = null;
+
try {
sendSession = getSessionFromPool(m_poolTimeout);
- byte[] response = sendSession.call(endpoint,
- message,
+
+ byte[] response = sendSession.call(endpoint, message,
timeoutTime - System.currentTimeMillis(),
properties);
+
returnSessionToPool(sendSession);
+
if (response == null) {
- throw new InvokeTimeoutException("Unable to complete call in time allotted");
+ throw new InvokeTimeoutException(
+ "Unable to complete call in time allotted");
}
+
return response;
- }
- catch (JMSException jmse) {
+ } catch (JMSException jmse) {
if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
- //this we cannot recover from
- //but it does not invalidate the session
+
+ // this we cannot recover from
+ // but it does not invalidate the session
returnSessionToPool(sendSession);
+
throw jmse;
}
- //for now we will assume this is a reconnect related issue
- //and let the sender be collected
- //give the reconnect thread a chance to fill the pool
+ // for now we will assume this is a reconnect related issue
+ // and let the sender be collected
+ // give the reconnect thread a chance to fill the pool
Thread.yield();
+
continue;
- }
- catch (NullPointerException npe) {
+ } catch (NullPointerException npe) {
Thread.yield();
+
continue;
}
}
}
+ protected abstract SendSession createSendSession(javax.jms.Connection connection)
+ throws JMSException;
+
+ protected void onConnect() throws JMSException {
+ synchronized (m_senderLock) {
+ for (int i = 0; i < m_numSessions; i++) {
+ m_senders.add(createSendSession(m_connection));
+ }
+
+ m_senderLock.notifyAll();
+ }
+ }
+
+ protected void onException() {
+ synchronized (m_senderLock) {
+ m_senders.clear();
+ }
+ }
+
+ protected void onShutdown() {
+ synchronized (m_senderLock) {
+ Iterator senders = m_senders.iterator();
+
+ while (senders.hasNext()) {
+ SendSession session = (SendSession) senders.next();
+
+ session.cleanup();
+ }
+
+ m_senders.clear();
+ }
+ }
+
+ private void returnSessionToPool(SendSession sendSession) {
+ synchronized (m_senderLock) {
+ m_senders.addLast(sendSession);
+ m_senderLock.notifyAll();
+ }
+ }
+
/**
* @todo add in handling for security exceptions
* @todo add support for timeouts
*/
- void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
- throws Exception {
+ void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
while (true) {
if (System.currentTimeMillis() > timeoutTime) {
throw new InvokeTimeoutException("Cannot complete send in time allotted");
}
SendSession sendSession = null;
+
try {
sendSession = getSessionFromPool(m_poolTimeout);
sendSession.send(endpoint, message, properties);
returnSessionToPool(sendSession);
- }
- catch (JMSException jmse) {
+ } catch (JMSException jmse) {
if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
- //this we cannot recover from
- //but it does not invalidate the session
+
+ // this we cannot recover from
+ // but it does not invalidate the session
returnSessionToPool(sendSession);
+
throw jmse;
}
- //for now we will assume this is a reconnect related issue
- //and let the sender be collected
- //give the reconnect thread a chance to fill the pool
- Thread.yield();
- continue;
- }
- catch (NullPointerException npe) {
- //give the reconnect thread a chance to fill the pool
+
+ // for now we will assume this is a reconnect related issue
+ // and let the sender be collected
+ // give the reconnect thread a chance to fill the pool
Thread.yield();
+
continue;
- }
- break;
- }
- }
+ } catch (NullPointerException npe) {
- protected void onException() {
- synchronized (m_senderLock) {
- m_senders.clear();
- }
- }
+ // give the reconnect thread a chance to fill the pool
+ Thread.yield();
- protected void onShutdown() {
- synchronized (m_senderLock) {
- Iterator senders = m_senders.iterator();
- while (senders.hasNext()) {
- SendSession session = (SendSession) senders.next();
- session.cleanup();
+ continue;
}
- m_senders.clear();
+
+ break;
}
}
@@ -530,84 +741,33 @@
while (m_senders.size() == 0) {
try {
m_senderLock.wait(timeout);
+
if (m_senders.size() == 0) {
return null;
}
- }
- catch (InterruptedException ignore) {
+ } catch (InterruptedException ignore) {
return null;
}
}
- return (SendSession) m_senders.removeFirst();
- }
- }
- private void returnSessionToPool(SendSession sendSession) {
- synchronized (m_senderLock) {
- m_senders.addLast(sendSession);
- m_senderLock.notifyAll();
+ return (SendSession) m_senders.removeFirst();
}
}
protected abstract class SendSession extends ConnectorSession {
MessageProducer m_producer;
- SendSession(Session session,
- MessageProducer producer)
- throws JMSException {
+ SendSession(Session session, MessageProducer producer) throws JMSException {
super(session);
m_producer = producer;
}
- protected abstract Destination createTemporaryDestination()
- throws JMSException;
-
- protected abstract void deleteTemporaryDestination(Destination destination)
- throws JMSException;
-
- protected abstract MessageConsumer createConsumer(Destination destination)
- throws JMSException;
-
- protected abstract void send(Destination destination,
- Message message,
- int deliveryMode,
- int priority,
- long timeToLive)
- throws JMSException;
-
- void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
- throws Exception {
- BytesMessage jmsMessage = m_session.createBytesMessage();
- jmsMessage.writeBytes(message);
- int deliveryMode = extractDeliveryMode(properties);
- int priority = extractPriority(properties);
- long timeToLive = extractTimeToLive(properties);
-
- if (properties != null && !properties.isEmpty())
- setProperties(properties, jmsMessage);
-
- send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
- priority, timeToLive);
- }
-
-
- void cleanup() {
- try {
- m_producer.close();
- } catch (Throwable t) {
- }
- try {
- m_session.close();
- } catch (Throwable t) {
- }
- }
-
- byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,
- HashMap properties)
+ byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
throws Exception {
Destination reply = createTemporaryDestination();
MessageConsumer subscriber = createConsumer(reply);
BytesMessage jmsMessage = m_session.createBytesMessage();
+
jmsMessage.writeBytes(message);
jmsMessage.setJMSReplyTo(reply);
@@ -615,269 +775,124 @@
int priority = extractPriority(properties);
long timeToLive = extractTimeToLive(properties);
- if (properties != null && !properties.isEmpty())
+ if ((properties != null) && !properties.isEmpty()) {
setProperties(properties, jmsMessage);
+ }
+
+ send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
+ timeToLive);
- send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
- priority, timeToLive);
BytesMessage response = null;
+
try {
response = (BytesMessage) subscriber.receive(timeout);
} catch (ClassCastException cce) {
- throw new InvokeException
- ("Error: unexpected message type received - expected BytesMessage");
+ throw new InvokeException(
+ "Error: unexpected message type received - expected BytesMessage");
}
+
byte[] respBytes = null;
+
if (response != null) {
- byte[] buffer = new byte[8 * 1024];
+ byte[] buffer = new byte[8 * 1024];
ByteArrayOutputStream out = new ByteArrayOutputStream();
- for (int bytesRead = response.readBytes(buffer);
- bytesRead != -1; bytesRead = response.readBytes(buffer)) {
+
+ for (int bytesRead = response.readBytes(buffer); bytesRead != -1;
+ bytesRead = response.readBytes(buffer)) {
out.write(buffer, 0, bytesRead);
}
+
respBytes = out.toByteArray();
}
+
subscriber.close();
deleteTemporaryDestination(reply);
+
return respBytes;
}
- private int extractPriority(HashMap properties) {
- return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
- JMSConstants.DEFAULT_PRIORITY);
+ void cleanup() {
+ try {
+ m_producer.close();
+ } catch (Throwable t) {
+ }
+
+ try {
+ m_session.close();
+ } catch (Throwable t) {
+ }
}
+ protected abstract MessageConsumer createConsumer(Destination destination)
+ throws JMSException;
+
+ protected abstract Destination createTemporaryDestination() throws JMSException;
+
+ protected abstract void deleteTemporaryDestination(Destination destination)
+ throws JMSException;
+
private int extractDeliveryMode(HashMap properties) {
return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
JMSConstants.DEFAULT_DELIVERY_MODE);
}
+ private int extractPriority(HashMap properties) {
+ return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
+ JMSConstants.DEFAULT_PRIORITY);
+ }
+
private long extractTimeToLive(HashMap properties) {
return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
JMSConstants.DEFAULT_TIME_TO_LIVE);
}
- private void setProperties(HashMap properties, Message message)
- throws JMSException {
- Iterator propertyIter = properties.entrySet().iterator();
- while (propertyIter.hasNext()) {
- Map.Entry property = (Map.Entry) propertyIter.next();
- setProperty((String) property.getKey(), property.getValue(),
- message);
- }
- }
-
- private void setProperty(String property, Object value, Message message)
- throws JMSException {
- if (property == null)
- return;
- if (property.equals(JMSConstants.JMS_CORRELATION_ID))
- message.setJMSCorrelationID((String) value);
- else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))
- message.setJMSCorrelationIDAsBytes((byte[]) value);
- else if (property.equals(JMSConstants.JMS_TYPE))
- message.setJMSType((String) value);
- else
- message.setObjectProperty(property, value);
- }
- }
- }
-
- AsyncConnection getReceiveConnection() {
- return m_receiveConnection;
- }
-
- protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection,
- String threadName,
- String clientID,
- String username,
- String password)
-
- throws JMSException;
-
- protected abstract class AsyncConnection extends Connection {
- HashMap m_subscriptions;
- Object m_subscriptionLock;
-
- protected AsyncConnection(ConnectionFactory connectionFactory,
- javax.jms.Connection connection,
- String threadName,
- String clientID,
- String username,
- String password)
- throws JMSException {
- super(connectionFactory, connection, threadName,
- clientID, username, password);
- m_subscriptions = new HashMap();
- m_subscriptionLock = new Object();
- }
+ void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
+ BytesMessage jmsMessage = m_session.createBytesMessage();
- protected abstract ListenerSession createListenerSession(
- javax.jms.Connection connection,
- Subscription subscription)
- throws Exception;
+ jmsMessage.writeBytes(message);
- protected void onShutdown() {
- synchronized (m_subscriptionLock) {
- Iterator subscriptions = m_subscriptions.keySet().iterator();
- while (subscriptions.hasNext()) {
- Subscription subscription = (Subscription) subscriptions.next();
- ListenerSession session = (ListenerSession)
- m_subscriptions.get(subscription);
- if (session != null) {
- session.cleanup();
- }
+ int deliveryMode = extractDeliveryMode(properties);
+ int priority = extractPriority(properties);
+ long timeToLive = extractTimeToLive(properties);
+ if ((properties != null) && !properties.isEmpty()) {
+ setProperties(properties, jmsMessage);
}
- m_subscriptions.clear();
- }
- }
-
- /**
- * @param subscription
- * @todo add in security exception propagation
- */
- void subscribe(Subscription subscription)
- throws Exception {
- long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
- synchronized (m_subscriptionLock) {
- if (m_subscriptions.containsKey(subscription))
- return;
- while (true) {
- if (System.currentTimeMillis() > timeoutTime) {
- throw new InvokeTimeoutException("Cannot subscribe listener");
- }
-
- try {
- ListenerSession session = createListenerSession(m_connection,
- subscription);
- m_subscriptions.put(subscription, session);
- break;
- }
- catch (JMSException jmse) {
- if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
- throw jmse;
- }
- try {
- m_subscriptionLock.wait(m_interactRetryInterval);
- }
- catch (InterruptedException ignore) {
- }
- //give reconnect a chance
- Thread.yield();
- continue;
- }
- catch (NullPointerException jmse) {
- //we ARE reconnecting
- try {
- m_subscriptionLock.wait(m_interactRetryInterval);
- }
- catch (InterruptedException ignore) {
- }
- //give reconnect a chance
- Thread.yield();
- continue;
- }
- }
+ send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
+ timeToLive);
}
- }
- void unsubscribe(Subscription subscription) {
- long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
- synchronized (m_subscriptionLock) {
- if (!m_subscriptions.containsKey(subscription))
- return;
- while (true) {
- if (System.currentTimeMillis() > timeoutTime) {
- throw new InvokeTimeoutException("Cannot unsubscribe listener");
- }
+ protected abstract void send(Destination destination, Message message,
+ int deliveryMode, int priority, long timeToLive)
+ throws JMSException;
- //give reconnect a chance
- Thread.yield();
- try {
- ListenerSession session = (ListenerSession)
- m_subscriptions.get(subscription);
- session.cleanup();
- m_subscriptions.remove(subscription);
- break;
- }
- catch (NullPointerException jmse) {
- //we are reconnecting
- try {
- m_subscriptionLock.wait(m_interactRetryInterval);
- }
- catch (InterruptedException ignore) {
- }
- continue;
- }
- }
- }
- }
+ private void setProperties(HashMap properties, Message message) throws JMSException {
+ Iterator propertyIter = properties.entrySet().iterator();
- protected void onConnect()
- throws Exception {
- synchronized (m_subscriptionLock) {
- Iterator subscriptions = m_subscriptions.keySet().iterator();
- while (subscriptions.hasNext()) {
- Subscription subscription = (Subscription) subscriptions.next();
+ while (propertyIter.hasNext()) {
+ Map.Entry property = (Map.Entry) propertyIter.next();
- if (m_subscriptions.get(subscription) == null) {
- m_subscriptions.put(subscription,
- createListenerSession(m_connection, subscription));
- }
+ setProperty((String) property.getKey(), property.getValue(), message);
}
- m_subscriptionLock.notifyAll();
}
- }
- protected void onException() {
- synchronized (m_subscriptionLock) {
- Iterator subscriptions = m_subscriptions.keySet().iterator();
- while (subscriptions.hasNext()) {
- Subscription subscription = (Subscription) subscriptions.next();
- m_subscriptions.put(subscription, null);
+ private void setProperty(String property, Object value, Message message)
+ throws JMSException {
+ if (property == null) {
+ return;
}
- }
- }
-
-
- protected class ListenerSession extends ConnectorSession {
- protected MessageConsumer m_consumer;
- protected Subscription m_subscription;
-
- ListenerSession(Session session,
- MessageConsumer consumer,
- Subscription subscription)
- throws Exception {
- super(session);
- m_subscription = subscription;
- m_consumer = consumer;
- Destination destination = subscription.m_endpoint.getDestination(m_session);
- m_consumer.setMessageListener(subscription.m_listener);
- }
- void cleanup() {
- try {
- m_consumer.close();
- } catch (Exception ignore) {
- }
- try {
- m_session.close();
- } catch (Exception ignore) {
+ if (property.equals(JMSConstants.JMS_CORRELATION_ID)) {
+ message.setJMSCorrelationID((String) value);
+ } else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES)) {
+ message.setJMSCorrelationIDAsBytes((byte[]) value);
+ } else if (property.equals(JMSConstants.JMS_TYPE)) {
+ message.setJMSType((String) value);
+ } else {
+ message.setObjectProperty(property, value);
}
}
-
- }
- }
-
- private abstract class ConnectorSession {
- Session m_session;
-
- ConnectorSession(Session session)
- throws JMSException {
- m_session = session;
}
}
-}
\ No newline at end of file
+}