You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Sunny Liu (JIRA)" <ji...@apache.org> on 2007/09/10 20:17:22 UTC
[jira] Commented: (AMQ-1228) CLONE -JMS to JMS Bridge never
reconnects under remote broker restarts and connections are not closed.
[ https://issues.apache.org/activemq/browse/AMQ-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40117 ]
Sunny Liu commented on AMQ-1228:
--------------------------------
I have experience same problem with openJMS connection. However, I have fixed it in org.apache.activemq.network.jms.JmsQueueConnector and org.apache.activemq.network.jms.JmsTopicConnector. I have tested new change it work fine for me.
Here is code.
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Bridge to other JMS Topic providers
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.1.1.1 $
*/
public class JmsTopicConnector extends JmsConnector
implements ExceptionListener
{
private static final Log log=LogFactory.getLog(JmsTopicConnector.class);
private String outboundTopicConnectionFactoryName;
private String localConnectionFactoryName;
private TopicConnectionFactory outboundTopicConnectionFactory;
private TopicConnectionFactory localTopicConnectionFactory;
private TopicConnection outboundTopicConnection;
private TopicConnection localTopicConnection;
private InboundTopicBridge[] inboundTopicBridges;
private OutboundTopicBridge[] outboundTopicBridges;
public boolean init(){
boolean result=super.init();
if(result){
try{
initializeForeignTopicConnection();
initializeLocalTopicConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundTopicBridges();
initializeOutboundTopicBridges();
}catch(Exception e){
log.error("Failed to initialize the JMSConnector",e);
}
}
return result;
}
protected boolean reInit()
{
boolean ret = false;
try{
if(outboundTopicConnectionFactoryName!=null){
this.outboundTopicConnection = null;
this.outboundTopicConnectionFactory = null;
}
initializeForeignTopicConnection();
initializeLocalTopicConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundTopicBridges();
initializeOutboundTopicBridges();
ret = true;
}catch(Exception e){
ret = false;
log.error("Failed to initialize the JMSConnector",e);
}
return ret;
}
public void onException(JMSException jmsException)
{
if(started.get()) started.compareAndSet(true, false);
boolean initSuccess = false;
do{
initSuccess = reInit();
if(!initSuccess){
log.warn("Still not able to connect to foreign server, wait another 5 second and try again.");
try {
Thread.sleep(5000);
} catch (Exception e) {
;
}
}else{
log.warn("reconnect to foreign server successfully.");
try {
this.start();
} catch (Exception e) {
initSuccess = false;
log.warn("Failed to restart.", e);
}
}
}while(!initSuccess);
}
/**
* @return Returns the inboundTopicBridges.
*/
public InboundTopicBridge[] getInboundTopicBridges(){
return inboundTopicBridges;
}
/**
* @param inboundTopicBridges
* The inboundTopicBridges to set.
*/
public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges){
this.inboundTopicBridges=inboundTopicBridges;
}
/**
* @return Returns the outboundTopicBridges.
*/
public OutboundTopicBridge[] getOutboundTopicBridges(){
return outboundTopicBridges;
}
/**
* @param outboundTopicBridges
* The outboundTopicBridges to set.
*/
public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges){
this.outboundTopicBridges=outboundTopicBridges;
}
/**
* @return Returns the localTopicConnectionFactory.
*/
public TopicConnectionFactory getLocalTopicConnectionFactory(){
return localTopicConnectionFactory;
}
/**
* @param localTopicConnectionFactory
* The localTopicConnectionFactory to set.
*/
public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory){
this.localTopicConnectionFactory=localConnectionFactory;
}
/**
* @return Returns the outboundTopicConnectionFactory.
*/
public TopicConnectionFactory getOutboundTopicConnectionFactory(){
return outboundTopicConnectionFactory;
}
/**
* @return Returns the outboundTopicConnectionFactoryName.
*/
public String getOutboundTopicConnectionFactoryName(){
return outboundTopicConnectionFactoryName;
}
/**
* @param outboundTopicConnectionFactoryName
* The outboundTopicConnectionFactoryName to set.
*/
public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName){
this.outboundTopicConnectionFactoryName=foreignTopicConnectionFactoryName;
}
/**
* @return Returns the localConnectionFactoryName.
*/
public String getLocalConnectionFactoryName(){
return localConnectionFactoryName;
}
/**
* @param localConnectionFactoryName
* The localConnectionFactoryName to set.
*/
public void setLocalConnectionFactoryName(String localConnectionFactoryName){
this.localConnectionFactoryName=localConnectionFactoryName;
}
/**
* @return Returns the localTopicConnection.
*/
public TopicConnection getLocalTopicConnection(){
return localTopicConnection;
}
/**
* @param localTopicConnection
* The localTopicConnection to set.
*/
public void setLocalTopicConnection(TopicConnection localTopicConnection){
this.localTopicConnection=localTopicConnection;
}
/**
* @return Returns the outboundTopicConnection.
*/
public TopicConnection getOutboundTopicConnection(){
return outboundTopicConnection;
}
/**
* @param outboundTopicConnection
* The outboundTopicConnection to set.
*/
public void setOutboundTopicConnection(TopicConnection foreignTopicConnection){
this.outboundTopicConnection=foreignTopicConnection;
}
/**
* @param outboundTopicConnectionFactory
* The outboundTopicConnectionFactory to set.
*/
public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory){
this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
}
public void restartProducerConnection() throws NamingException, JMSException {
outboundTopicConnection = null;
initializeForeignTopicConnection();
}
protected void initializeForeignTopicConnection() throws NamingException,JMSException{
if(outboundTopicConnection==null){
// get the connection factories
if(outboundTopicConnectionFactory==null){
// look it up from JNDI
if(outboundTopicConnectionFactoryName!=null){
outboundTopicConnectionFactory=(TopicConnectionFactory) jndiOutboundTemplate.lookup(
outboundTopicConnectionFactoryName,TopicConnectionFactory.class);
if(outboundUsername!=null){
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
outboundPassword);
}else{
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
}
}else {
throw new JMSException("Cannot create localConnection - no information");
}
}else {
if(outboundUsername!=null){
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
outboundPassword);
}else{
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
}
}
}
outboundTopicConnection.start();
}
protected void initializeLocalTopicConnection() throws NamingException,JMSException{
if(localTopicConnection==null){
// get the connection factories
if(localTopicConnectionFactory==null){
if(embeddedConnectionFactory==null){
// look it up from JNDI
if(localConnectionFactoryName!=null){
localTopicConnectionFactory=(TopicConnectionFactory) jndiLocalTemplate.lookup(
localConnectionFactoryName,TopicConnectionFactory.class);
if(localUsername!=null){
localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
localPassword);
}else{
localTopicConnection=localTopicConnectionFactory.createTopicConnection();
}
}else {
throw new JMSException("Cannot create localConnection - no information");
}
}else{
localTopicConnection = embeddedConnectionFactory.createTopicConnection();
}
}else {
if(localUsername!=null){
localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
localPassword);
}else{
localTopicConnection=localTopicConnectionFactory.createTopicConnection();
}
}
}
localTopicConnection.start();
}
protected void initializeInboundJmsMessageConvertor(){
inboundMessageConvertor.setConnection(localTopicConnection);
}
protected void initializeOutboundJmsMessageConvertor(){
outboundMessageConvertor.setConnection(outboundTopicConnection);
}
protected void initializeInboundTopicBridges() throws JMSException{
if(inboundTopicBridges!=null){
TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
for(int i=0;i<inboundTopicBridges.length;i++){
InboundTopicBridge bridge=inboundTopicBridges[i];
String localTopicName=bridge.getLocalTopicName();
Topic activemqTopic=createActiveMQTopic(localSession,localTopicName);
String topicName=bridge.getInboundTopicName();
Topic foreignTopic=createForeignTopic(outboundSession,topicName);
bridge.setConsumerTopic(foreignTopic);
bridge.setProducerTopic(activemqTopic);
bridge.setProducerConnection(localTopicConnection);
bridge.setConsumerConnection(outboundTopicConnection);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
addInboundBridge(bridge);
}
outboundSession.close();
localSession.close();
}
}
protected void initializeOutboundTopicBridges() throws JMSException{
if(outboundTopicBridges!=null){
TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
for(int i=0;i<outboundTopicBridges.length;i++){
OutboundTopicBridge bridge=outboundTopicBridges[i];
String localTopicName=bridge.getLocalTopicName();
Topic activemqTopic=createActiveMQTopic(localSession,localTopicName);
String topicName=bridge.getOutboundTopicName();
Topic foreignTopic=createForeignTopic(outboundSession,topicName);
bridge.setConsumerTopic(activemqTopic);
bridge.setProducerTopic(foreignTopic);
bridge.setProducerConnection(outboundTopicConnection);
bridge.setConsumerConnection(localTopicConnection);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
addOutboundBridge(bridge);
}
outboundSession.close();
localSession.close();
}
}
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){
Topic replyToProducerTopic =(Topic)destination;
boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
if(isInbound){
InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic);
if (bridge == null){
bridge = new InboundTopicBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
replyToConsumerSession.close();
bridge.setConsumerTopic(replyToConsumerTopic);
bridge.setProducerTopic(replyToProducerTopic);
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerTopic);
}catch(Exception e){
log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
return null;
}
replyToBridges.put(replyToProducerTopic, bridge);
}
return bridge.getConsumerTopic();
}else{
OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic);
if (bridge == null){
bridge = new OutboundTopicBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
replyToConsumerSession.close();
bridge.setConsumerTopic(replyToConsumerTopic);
bridge.setProducerTopic(replyToProducerTopic);
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerTopic);
}catch(Exception e){
log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
return null;
}
replyToBridges.put(replyToProducerTopic, bridge);
}
return bridge.getConsumerTopic();
}
}
protected Topic createActiveMQTopic(TopicSession session,String topicName) throws JMSException{
return session.createTopic(topicName);
}
protected Topic createForeignTopic(TopicSession session,String topicName) throws JMSException{
Topic result = null;
try{
result = session.createTopic(topicName);
}catch(JMSException e){
//look-up the Topic
try{
result = (Topic) jndiOutboundTemplate.lookup(topicName, Topic.class);
}catch(NamingException e1){
String errStr = "Failed to look-up Topic for name: " + topicName;
log.error(errStr,e);
JMSException jmsEx = new JMSException(errStr);
jmsEx.setLinkedException(e1);
throw jmsEx;
}
}
return result;
}
}
///======================================================
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
/**
* A Bridge to other JMS Queue providers
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.1.1.1 $
*/
public class JmsQueueConnector extends JmsConnector
implements ExceptionListener
{
private static final Log log=LogFactory.getLog(JmsQueueConnector.class);
private String outboundQueueConnectionFactoryName;
private String localConnectionFactoryName;
private QueueConnectionFactory outboundQueueConnectionFactory;
private QueueConnectionFactory localQueueConnectionFactory;
private QueueConnection outboundQueueConnection;
private QueueConnection localQueueConnection;
private InboundQueueBridge[] inboundQueueBridges;
private OutboundQueueBridge[] outboundQueueBridges;
public boolean init(){
boolean result=super.init();
if(result){
try{
initializeForeignQueueConnection();
initializeLocalQueueConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundQueueBridges();
initializeOutboundQueueBridges();
}catch(Exception e){
log.error("Failed to initialize the JMSConnector",e);
}
}
return result;
}
protected boolean reInit()
{
boolean ret = false;
try{
if(outboundQueueConnectionFactoryName!=null){
this.outboundQueueConnection = null;
this.outboundQueueConnectionFactory = null;
}
initializeForeignQueueConnection();
initializeLocalQueueConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundQueueBridges();
initializeOutboundQueueBridges();
ret = true;
}catch(Exception e){
ret = false;
log.error("Failed to initialize the JMSConnector",e);
}
return ret;
}
public void onException(JMSException jmsException)
{
if(started.get()) started.compareAndSet(true, false);
boolean initSuccess = false;
do{
initSuccess = reInit();
if(!initSuccess){
log.warn("Still not able to connect to foreign server, wait another 5 second and try again.");
try {
Thread.sleep(5000);
} catch (Exception e) {
;
}
}else{
log.warn("reconnect to foreign server successfully.");
try {
this.start();
} catch (Exception e) {
initSuccess = false;
log.warn("Failed to restart.", e);
}
}
}while(!initSuccess);
}
/**
* @return Returns the inboundQueueBridges.
*/
public InboundQueueBridge[] getInboundQueueBridges(){
return inboundQueueBridges;
}
/**
* @param inboundQueueBridges
* The inboundQueueBridges to set.
*/
public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges){
this.inboundQueueBridges=inboundQueueBridges;
}
/**
* @return Returns the outboundQueueBridges.
*/
public OutboundQueueBridge[] getOutboundQueueBridges(){
return outboundQueueBridges;
}
/**
* @param outboundQueueBridges
* The outboundQueueBridges to set.
*/
public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges){
this.outboundQueueBridges=outboundQueueBridges;
}
/**
* @return Returns the localQueueConnectionFactory.
*/
public QueueConnectionFactory getLocalQueueConnectionFactory(){
return localQueueConnectionFactory;
}
/**
* @param localQueueConnectionFactory
* The localQueueConnectionFactory to set.
*/
public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory){
this.localQueueConnectionFactory=localConnectionFactory;
}
/**
* @return Returns the outboundQueueConnectionFactory.
*/
public QueueConnectionFactory getOutboundQueueConnectionFactory(){
return outboundQueueConnectionFactory;
}
/**
* @return Returns the outboundQueueConnectionFactoryName.
*/
public String getOutboundQueueConnectionFactoryName(){
return outboundQueueConnectionFactoryName;
}
/**
* @param outboundQueueConnectionFactoryName
* The outboundQueueConnectionFactoryName to set.
*/
public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName){
this.outboundQueueConnectionFactoryName=foreignQueueConnectionFactoryName;
}
/**
* @return Returns the localConnectionFactoryName.
*/
public String getLocalConnectionFactoryName(){
return localConnectionFactoryName;
}
/**
* @param localConnectionFactoryName
* The localConnectionFactoryName to set.
*/
public void setLocalConnectionFactoryName(String localConnectionFactoryName){
this.localConnectionFactoryName=localConnectionFactoryName;
}
/**
* @return Returns the localQueueConnection.
*/
public QueueConnection getLocalQueueConnection(){
return localQueueConnection;
}
/**
* @param localQueueConnection
* The localQueueConnection to set.
*/
public void setLocalQueueConnection(QueueConnection localQueueConnection){
this.localQueueConnection=localQueueConnection;
}
/**
* @return Returns the outboundQueueConnection.
*/
public QueueConnection getOutboundQueueConnection(){
return outboundQueueConnection;
}
/**
* @param outboundQueueConnection
* The outboundQueueConnection to set.
*/
public void setOutboundQueueConnection(QueueConnection foreignQueueConnection){
this.outboundQueueConnection=foreignQueueConnection;
}
/**
* @param outboundQueueConnectionFactory
* The outboundQueueConnectionFactory to set.
*/
public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory){
this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
}
public void restartProducerConnection() throws NamingException, JMSException {
outboundQueueConnection = null;
initializeForeignQueueConnection();
}
protected void initializeForeignQueueConnection() throws NamingException,JMSException{
if(outboundQueueConnection==null){
// get the connection factories
if(outboundQueueConnectionFactory==null){
// look it up from JNDI
if(outboundQueueConnectionFactoryName!=null){
outboundQueueConnectionFactory=(QueueConnectionFactory) jndiOutboundTemplate.lookup(
outboundQueueConnectionFactoryName,QueueConnectionFactory.class);
if(outboundUsername!=null){
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
outboundPassword);
}else{
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
}
}else {
throw new JMSException("Cannot create localConnection - no information");
}
}else {
if(outboundUsername!=null){
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
outboundPassword);
}else{
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
}
}
}
outboundQueueConnection.start();
outboundQueueConnection.setExceptionListener(this);
}
protected void initializeLocalQueueConnection() throws NamingException,JMSException{
if(localQueueConnection==null){
// get the connection factories
if(localQueueConnectionFactory==null){
if(embeddedConnectionFactory==null){
// look it up from JNDI
if(localConnectionFactoryName!=null){
localQueueConnectionFactory=(QueueConnectionFactory) jndiLocalTemplate.lookup(
localConnectionFactoryName,QueueConnectionFactory.class);
if(localUsername!=null){
localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
localPassword);
}else{
localQueueConnection=localQueueConnectionFactory.createQueueConnection();
}
}else {
throw new JMSException("Cannot create localConnection - no information");
}
}else{
localQueueConnection = embeddedConnectionFactory.createQueueConnection();
}
}else {
if(localUsername!=null){
localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
localPassword);
}else{
localQueueConnection=localQueueConnectionFactory.createQueueConnection();
}
}
}
localQueueConnection.start();
}
protected void initializeInboundJmsMessageConvertor(){
inboundMessageConvertor.setConnection(localQueueConnection);
}
protected void initializeOutboundJmsMessageConvertor(){
outboundMessageConvertor.setConnection(outboundQueueConnection);
}
protected void initializeInboundQueueBridges() throws JMSException{
if(inboundQueueBridges!=null){
QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
for(int i=0;i<inboundQueueBridges.length;i++){
InboundQueueBridge bridge=inboundQueueBridges[i];
String localQueueName=bridge.getLocalQueueName();
Queue activemqQueue=createActiveMQQueue(localSession,localQueueName);
String queueName = bridge.getInboundQueueName();
Queue foreignQueue=createForeignQueue(outboundSession,queueName);
bridge.setConsumerQueue(foreignQueue);
bridge.setProducerQueue(activemqQueue);
bridge.setProducerConnection(localQueueConnection);
bridge.setConsumerConnection(outboundQueueConnection);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
addInboundBridge(bridge);
}
outboundSession.close();
localSession.close();
}
}
protected void initializeOutboundQueueBridges() throws JMSException{
if(outboundQueueBridges!=null){
QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
for(int i=0;i<outboundQueueBridges.length;i++){
OutboundQueueBridge bridge=outboundQueueBridges[i];
String localQueueName=bridge.getLocalQueueName();
Queue activemqQueue=createActiveMQQueue(localSession,localQueueName);
String queueName=bridge.getOutboundQueueName();
Queue foreignQueue=createForeignQueue(outboundSession,queueName);
bridge.setConsumerQueue(activemqQueue);
bridge.setProducerQueue(foreignQueue);
bridge.setProducerConnection(outboundQueueConnection);
bridge.setConsumerConnection(localQueueConnection);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
addOutboundBridge(bridge);
}
outboundSession.close();
localSession.close();
}
}
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){
Queue replyToProducerQueue =(Queue)destination;
boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
if(isInbound){
InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue);
if (bridge == null){
bridge = new InboundQueueBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
replyToConsumerSession.close();
bridge.setConsumerQueue(replyToConsumerQueue);
bridge.setProducerQueue(replyToProducerQueue);
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerQueue);
}catch(Exception e){
log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
return null;
}
replyToBridges.put(replyToProducerQueue, bridge);
}
return bridge.getConsumerQueue();
}else{
OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue);
if (bridge == null){
bridge = new OutboundQueueBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
replyToConsumerSession.close();
bridge.setConsumerQueue(replyToConsumerQueue);
bridge.setProducerQueue(replyToProducerQueue);
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerQueue);
}catch(Exception e){
log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
return null;
}
replyToBridges.put(replyToProducerQueue, bridge);
}
return bridge.getConsumerQueue();
}
}
protected Queue createActiveMQQueue(QueueSession session,String queueName) throws JMSException{
return session.createQueue(queueName);
}
protected Queue createForeignQueue(QueueSession session,String queueName) throws JMSException{
Queue result = null;
try{
result = session.createQueue(queueName);
}catch(JMSException e){
//look-up the Queue
try{
result = (Queue) jndiOutboundTemplate.lookup(queueName, Queue.class);
}catch(NamingException e1){
String errStr = "Failed to look-up Queue for name: " + queueName;
log.error(errStr,e);
JMSException jmsEx = new JMSException(errStr);
jmsEx.setLinkedException(e1);
throw jmsEx;
}
}
return result;
}
}
> CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.
> ------------------------------------------------------------------------------------------------------
>
> Key: AMQ-1228
> URL: https://issues.apache.org/activemq/browse/AMQ-1228
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 4.0 RC2, 4.0.1
> Reporter: William MacDonald
>
> I'm using ActiveMQ (4.0.1) JMS to JMS Bridge functionality to connect to a SunMQ JMS Broker (3.6 SP3 (Build 02-A)). I'm using two queues, an input and an output one, with the following configuration:
> <jmsBridgeConnectors>
> <jmsQueueConnector outboundQueueConnectionFactory="#REMOTE">
> <outboundQueueBridges>
> <outboundQueueBridge outboundQueueName="SUNRECV"/>
> </outboundQueueBridges>
> <inboundQueueBridges>
> <inboundQueueBridge inboundQueueName="SUNSEND"/>
> </inboundQueueBridges>
> </jmsQueueConnector>
> </jmsBridgeConnectors>
> The system works really well until the SunMQ broker needed to be restarted. This is what I found:
> 1.-ActiveMQ is not aware of the remote broker shutdown. I waited for a while, but no log on ActiveMQ indicates knowledge about the new situation.
> 2.-When I send a message to the output queue SUNRECV, ActiveMQ complains that the producer is closed:
> [ERROR][2006/08/25.09:47:12.039][ActiveMQ Session Task]failed to forward message: ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:trabucco-43457-1156491843149-3:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:trabucco-43457-1156491843149-3:4:1:1, destination = queue://SUNRECV, transactionId = null, expiration = 0, timestamp = 1156492032027, arrival = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 2, properties = null, readOnlyProperties = true, readOnlyBody = true, text = 1}([C4064]: Cannot perform operation, producer is closed.)
> After this, it is automatically queueing messages without sending them, showing the log:
> [DEBUG][2006/08/25.09:47:42.721][RMI TCP Connection(4)-10.95.89.20]No subscriptions registered, will not dispatch message at this time.
> Even if SunMQ is started again, ActiveMQ is not detecting the new situation, and continues queueing messages sent to SUNRECV.
> Please, make me know if more information is needed to understand the situation.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.