You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Sunny Liu (JIRA)" <ji...@apache.org> on 2007/09/10 20:17:22 UTC

[jira] Commented: (AMQ-1228) CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.

    [ https://issues.apache.org/activemq/browse/AMQ-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40117 ] 

Sunny Liu commented on AMQ-1228:
--------------------------------

I have experience same problem with openJMS connection. However, I have fixed it in org.apache.activemq.network.jms.JmsQueueConnector and org.apache.activemq.network.jms.JmsTopicConnector. I have tested new change it work fine for me.

Here is code.

/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Bridge to other JMS Topic providers
 * 
 * @org.apache.xbean.XBean
 * 
 * @version $Revision: 1.1.1.1 $
 */
public class JmsTopicConnector extends JmsConnector
implements ExceptionListener
{
    private static final Log log=LogFactory.getLog(JmsTopicConnector.class);
    private String outboundTopicConnectionFactoryName;
    private String localConnectionFactoryName;
    private TopicConnectionFactory outboundTopicConnectionFactory;
    private TopicConnectionFactory localTopicConnectionFactory;
    private TopicConnection outboundTopicConnection;
    private TopicConnection localTopicConnection;
    private InboundTopicBridge[] inboundTopicBridges;
    private OutboundTopicBridge[] outboundTopicBridges;
    
    public boolean init(){
        boolean result=super.init();
        if(result){
            try{
                initializeForeignTopicConnection();
                initializeLocalTopicConnection();
                initializeInboundJmsMessageConvertor();
                initializeOutboundJmsMessageConvertor();
                initializeInboundTopicBridges();
                initializeOutboundTopicBridges();
            }catch(Exception e){
                log.error("Failed to initialize the JMSConnector",e);
            }
        }
        return result;
    }   
    

    protected boolean reInit()
    {
    	boolean ret = false;
    	try{            
    		if(outboundTopicConnectionFactoryName!=null){
            	this.outboundTopicConnection = null;
            	this.outboundTopicConnectionFactory = null;
            }
    		initializeForeignTopicConnection();
            initializeLocalTopicConnection();
            initializeInboundJmsMessageConvertor();
            initializeOutboundJmsMessageConvertor();
            initializeInboundTopicBridges();
            initializeOutboundTopicBridges();
            ret = true;
        }catch(Exception e){
            ret = false;
        	log.error("Failed to initialize the JMSConnector",e);
        }
        return ret;
    }
    
    public void onException(JMSException jmsException)
    {
    	if(started.get()) started.compareAndSet(true, false);
    	boolean  initSuccess = false;
    	do{
    		initSuccess = reInit();
			if(!initSuccess){
				log.warn("Still not able to connect to foreign server, wait another 5 second and try again.");
				try {				
	        		Thread.sleep(5000);
				} catch (Exception e) {					
					;
				}				
			}else{				
				log.warn("reconnect to foreign server successfully.");	
				try {
					this.start();
				} catch (Exception e) {
					initSuccess = false;
					log.warn("Failed to restart.", e);
				}
			}
		}while(!initSuccess);
		
	}
    
    /**
     * @return Returns the inboundTopicBridges.
     */
    public InboundTopicBridge[] getInboundTopicBridges(){
        return inboundTopicBridges;
    }

    /**
     * @param inboundTopicBridges
     *            The inboundTopicBridges to set.
     */
    public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges){
        this.inboundTopicBridges=inboundTopicBridges;
    }

    /**
     * @return Returns the outboundTopicBridges.
     */
    public OutboundTopicBridge[] getOutboundTopicBridges(){
        return outboundTopicBridges;
    }

    /**
     * @param outboundTopicBridges
     *            The outboundTopicBridges to set.
     */
    public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges){
        this.outboundTopicBridges=outboundTopicBridges;
    }

    /**
     * @return Returns the localTopicConnectionFactory.
     */
    public TopicConnectionFactory getLocalTopicConnectionFactory(){
        return localTopicConnectionFactory;
    }

    /**
     * @param localTopicConnectionFactory
     *            The localTopicConnectionFactory to set.
     */
    public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory){
        this.localTopicConnectionFactory=localConnectionFactory;
    }

    /**
     * @return Returns the outboundTopicConnectionFactory.
     */
    public TopicConnectionFactory getOutboundTopicConnectionFactory(){
        return outboundTopicConnectionFactory;
    }

    /**
     * @return Returns the outboundTopicConnectionFactoryName.
     */
    public String getOutboundTopicConnectionFactoryName(){
        return outboundTopicConnectionFactoryName;
    }

    /**
     * @param outboundTopicConnectionFactoryName
     *            The outboundTopicConnectionFactoryName to set.
     */
    public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName){
        this.outboundTopicConnectionFactoryName=foreignTopicConnectionFactoryName;
    }

    /**
     * @return Returns the localConnectionFactoryName.
     */
    public String getLocalConnectionFactoryName(){
        return localConnectionFactoryName;
    }

    /**
     * @param localConnectionFactoryName
     *            The localConnectionFactoryName to set.
     */
    public void setLocalConnectionFactoryName(String localConnectionFactoryName){
        this.localConnectionFactoryName=localConnectionFactoryName;
    }

    /**
     * @return Returns the localTopicConnection.
     */
    public TopicConnection getLocalTopicConnection(){
        return localTopicConnection;
    }

    /**
     * @param localTopicConnection
     *            The localTopicConnection to set.
     */
    public void setLocalTopicConnection(TopicConnection localTopicConnection){
        this.localTopicConnection=localTopicConnection;
    }

    /**
     * @return Returns the outboundTopicConnection.
     */
    public TopicConnection getOutboundTopicConnection(){
        return outboundTopicConnection;
    }

    /**
     * @param outboundTopicConnection
     *            The outboundTopicConnection to set.
     */
    public void setOutboundTopicConnection(TopicConnection foreignTopicConnection){
        this.outboundTopicConnection=foreignTopicConnection;
    }

    /**
     * @param outboundTopicConnectionFactory
     *            The outboundTopicConnectionFactory to set.
     */
    public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory){
        this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
    }


    public void restartProducerConnection() throws NamingException, JMSException {
        outboundTopicConnection = null;
        initializeForeignTopicConnection();
    }

    protected void initializeForeignTopicConnection() throws NamingException,JMSException{
        if(outboundTopicConnection==null){
            // get the connection factories
            if(outboundTopicConnectionFactory==null){
                // look it up from JNDI
                if(outboundTopicConnectionFactoryName!=null){
                    outboundTopicConnectionFactory=(TopicConnectionFactory) jndiOutboundTemplate.lookup(
                                    outboundTopicConnectionFactoryName,TopicConnectionFactory.class);
                    if(outboundUsername!=null){
                        outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
                                        outboundPassword);
                    }else{
                        outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
                    }
                }else {
                    throw new JMSException("Cannot create localConnection - no information");
                }
            }else {
                if(outboundUsername!=null){
                    outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
                                    outboundPassword);
                }else{
                    outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
                }
            }
        }
        outboundTopicConnection.start();
    }

    protected void initializeLocalTopicConnection() throws NamingException,JMSException{
        if(localTopicConnection==null){
            // get the connection factories
            if(localTopicConnectionFactory==null){
                if(embeddedConnectionFactory==null){
                    // look it up from JNDI
                    if(localConnectionFactoryName!=null){
                        localTopicConnectionFactory=(TopicConnectionFactory) jndiLocalTemplate.lookup(
                                        localConnectionFactoryName,TopicConnectionFactory.class);
                        if(localUsername!=null){
                            localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
                                            localPassword);
                        }else{
                            localTopicConnection=localTopicConnectionFactory.createTopicConnection();
                        }
                    }else {
                        throw new JMSException("Cannot create localConnection - no information");
                    }
                }else{
                    localTopicConnection = embeddedConnectionFactory.createTopicConnection();
                }
            }else {
                if(localUsername!=null){
                    localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
                                    localPassword);
                }else{
                    localTopicConnection=localTopicConnectionFactory.createTopicConnection();
                }
            }
        }
        localTopicConnection.start();
    }
    
    protected void initializeInboundJmsMessageConvertor(){
    	inboundMessageConvertor.setConnection(localTopicConnection);
    }
    
    protected void initializeOutboundJmsMessageConvertor(){
    	outboundMessageConvertor.setConnection(outboundTopicConnection);
    }

    protected void initializeInboundTopicBridges() throws JMSException{
        if(inboundTopicBridges!=null){
            TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<inboundTopicBridges.length;i++){
                InboundTopicBridge bridge=inboundTopicBridges[i];
                String localTopicName=bridge.getLocalTopicName();          
                Topic activemqTopic=createActiveMQTopic(localSession,localTopicName);
                String topicName=bridge.getInboundTopicName();
                Topic foreignTopic=createForeignTopic(outboundSession,topicName);
                bridge.setConsumerTopic(foreignTopic);
                bridge.setProducerTopic(activemqTopic);
                bridge.setProducerConnection(localTopicConnection);
                bridge.setConsumerConnection(outboundTopicConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addInboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected void initializeOutboundTopicBridges() throws JMSException{
        if(outboundTopicBridges!=null){
            TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<outboundTopicBridges.length;i++){
                OutboundTopicBridge bridge=outboundTopicBridges[i];
                String localTopicName=bridge.getLocalTopicName();
                Topic activemqTopic=createActiveMQTopic(localSession,localTopicName);
                String topicName=bridge.getOutboundTopicName();
                Topic foreignTopic=createForeignTopic(outboundSession,topicName);
                bridge.setConsumerTopic(activemqTopic);
                bridge.setProducerTopic(foreignTopic);
                bridge.setProducerConnection(outboundTopicConnection);
                bridge.setConsumerConnection(localTopicConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addOutboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }
    
    protected  Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){
    	Topic replyToProducerTopic =(Topic)destination;
    	boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
    	
    	if(isInbound){
    		InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic);
    		if (bridge == null){
    			bridge = new InboundTopicBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
    				Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
    				replyToConsumerSession.close();
    				bridge.setConsumerTopic(replyToConsumerTopic);
    				bridge.setProducerTopic(replyToProducerTopic);
    				bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getInboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerTopic);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerTopic, bridge);
    		}
    		return bridge.getConsumerTopic();
    	}else{
    		OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic);
    		if (bridge == null){
    			bridge = new OutboundTopicBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
    				Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
    				replyToConsumerSession.close();
    				bridge.setConsumerTopic(replyToConsumerTopic);
    				bridge.setProducerTopic(replyToProducerTopic);
    				bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerTopic);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerTopic, bridge);
    		}
    		return bridge.getConsumerTopic();
    	}		
    }
    
    protected Topic createActiveMQTopic(TopicSession session,String topicName) throws JMSException{
        return session.createTopic(topicName);
    }
    
    protected Topic createForeignTopic(TopicSession session,String topicName) throws JMSException{
        Topic result = null;
        try{
            result = session.createTopic(topicName);
        }catch(JMSException e){
            //look-up the Topic
            try{
                result = (Topic) jndiOutboundTemplate.lookup(topicName, Topic.class);
            }catch(NamingException e1){
                String errStr = "Failed to look-up Topic for name: " + topicName;
                log.error(errStr,e);
                JMSException jmsEx =  new JMSException(errStr);
                jmsEx.setLinkedException(e1);
                throw jmsEx;
            }
        }
        return result;
    }

    
}


///======================================================


/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
/**
 * A Bridge to other JMS Queue providers
 * 
 * @org.apache.xbean.XBean
 *
 * @version $Revision: 1.1.1.1 $
 */
public class JmsQueueConnector extends JmsConnector
implements ExceptionListener
{
    private static final Log log=LogFactory.getLog(JmsQueueConnector.class);
    private String outboundQueueConnectionFactoryName;
    private String localConnectionFactoryName;
    
    private QueueConnectionFactory outboundQueueConnectionFactory;
    private QueueConnectionFactory localQueueConnectionFactory;
    
    private QueueConnection outboundQueueConnection;
    private QueueConnection localQueueConnection;
    
    private InboundQueueBridge[] inboundQueueBridges;
    private OutboundQueueBridge[] outboundQueueBridges;   

    public boolean init(){
        boolean result=super.init();
        if(result){
            try{
                initializeForeignQueueConnection();
                initializeLocalQueueConnection();
                initializeInboundJmsMessageConvertor();
                initializeOutboundJmsMessageConvertor();
                initializeInboundQueueBridges();
                initializeOutboundQueueBridges();
            }catch(Exception e){                
            	log.error("Failed to initialize the JMSConnector",e);
            }
        }
        return result;
    }   
    
    protected boolean reInit()
    {
    	boolean ret = false;
    	try{            
    		if(outboundQueueConnectionFactoryName!=null){
            	this.outboundQueueConnection = null;
            	this.outboundQueueConnectionFactory = null;
            }
            initializeForeignQueueConnection();
            initializeLocalQueueConnection();
            initializeInboundJmsMessageConvertor();
            initializeOutboundJmsMessageConvertor();
            initializeInboundQueueBridges();
            initializeOutboundQueueBridges();
            ret = true;
        }catch(Exception e){
            ret = false;
        	log.error("Failed to initialize the JMSConnector",e);
        }
        return ret;
    }
    
    public void onException(JMSException jmsException)
    {
    	if(started.get()) started.compareAndSet(true, false);
    	boolean  initSuccess = false;
    	do{
    		initSuccess = reInit();
			if(!initSuccess){
				log.warn("Still not able to connect to foreign server, wait another 5 second and try again.");
				try {				
	        		Thread.sleep(5000);
				} catch (Exception e) {					
					;
				}				
			}else{				
				log.warn("reconnect to foreign server successfully.");	
				try {
					this.start();
				} catch (Exception e) {
					initSuccess = false;
					log.warn("Failed to restart.", e);
				}
			}
		}while(!initSuccess);
		
	}
    
    /**
     * @return Returns the inboundQueueBridges.
     */
    public InboundQueueBridge[] getInboundQueueBridges(){
        return inboundQueueBridges;
    }

    /**
     * @param inboundQueueBridges
     *            The inboundQueueBridges to set.
     */
    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges){
        this.inboundQueueBridges=inboundQueueBridges;
    }

    /**
     * @return Returns the outboundQueueBridges.
     */
    public OutboundQueueBridge[] getOutboundQueueBridges(){
        return outboundQueueBridges;
    }

    /**
     * @param outboundQueueBridges
     *            The outboundQueueBridges to set.
     */
    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges){
        this.outboundQueueBridges=outboundQueueBridges;
    }

    /**
     * @return Returns the localQueueConnectionFactory.
     */
    public QueueConnectionFactory getLocalQueueConnectionFactory(){
        return localQueueConnectionFactory;
    }

    /**
     * @param localQueueConnectionFactory
     *            The localQueueConnectionFactory to set.
     */
    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory){
        this.localQueueConnectionFactory=localConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactory.
     */
    public QueueConnectionFactory getOutboundQueueConnectionFactory(){
        return outboundQueueConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactoryName.
     */
    public String getOutboundQueueConnectionFactoryName(){
        return outboundQueueConnectionFactoryName;
    }

    /**
     * @param outboundQueueConnectionFactoryName
     *            The outboundQueueConnectionFactoryName to set.
     */
    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName){
        this.outboundQueueConnectionFactoryName=foreignQueueConnectionFactoryName;
    }

    /**
     * @return Returns the localConnectionFactoryName.
     */
    public String getLocalConnectionFactoryName(){
        return localConnectionFactoryName;
    }

    /**
     * @param localConnectionFactoryName
     *            The localConnectionFactoryName to set.
     */
    public void setLocalConnectionFactoryName(String localConnectionFactoryName){
        this.localConnectionFactoryName=localConnectionFactoryName;
    }

    /**
     * @return Returns the localQueueConnection.
     */
    public QueueConnection getLocalQueueConnection(){
        return localQueueConnection;
    }

    /**
     * @param localQueueConnection
     *            The localQueueConnection to set.
     */
    public void setLocalQueueConnection(QueueConnection localQueueConnection){
        this.localQueueConnection=localQueueConnection;
    }

    /**
     * @return Returns the outboundQueueConnection.
     */
    public QueueConnection getOutboundQueueConnection(){
        return outboundQueueConnection;
    }

    /**
     * @param outboundQueueConnection
     *            The outboundQueueConnection to set.
     */
    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection){
        this.outboundQueueConnection=foreignQueueConnection;
    }

    /**
     * @param outboundQueueConnectionFactory
     *            The outboundQueueConnectionFactory to set.
     */
    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory){
        this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
    }

    public void restartProducerConnection() throws NamingException, JMSException {
        outboundQueueConnection = null;
        initializeForeignQueueConnection();
        
    }

    protected void initializeForeignQueueConnection() throws NamingException,JMSException{
        
    	if(outboundQueueConnection==null){
            // get the connection factories
            if(outboundQueueConnectionFactory==null){
                // look it up from JNDI
                if(outboundQueueConnectionFactoryName!=null){
                    outboundQueueConnectionFactory=(QueueConnectionFactory) jndiOutboundTemplate.lookup(
                                    outboundQueueConnectionFactoryName,QueueConnectionFactory.class);
                    if(outboundUsername!=null){
                        outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
                                        outboundPassword);
                    }else{
                        outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
                    }
                }else {
                    throw new JMSException("Cannot create localConnection - no information");
                }
            }else {
                if(outboundUsername!=null){
                    outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
                                    outboundPassword);
                }else{
                    outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        outboundQueueConnection.start();
        outboundQueueConnection.setExceptionListener(this);
    }

    protected void initializeLocalQueueConnection() throws NamingException,JMSException{
        if(localQueueConnection==null){
            // get the connection factories
            if(localQueueConnectionFactory==null){
                if(embeddedConnectionFactory==null){
                    // look it up from JNDI
                    if(localConnectionFactoryName!=null){
                        localQueueConnectionFactory=(QueueConnectionFactory) jndiLocalTemplate.lookup(
                                        localConnectionFactoryName,QueueConnectionFactory.class);
                        if(localUsername!=null){
                            localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
                                            localPassword);
                        }else{
                            localQueueConnection=localQueueConnectionFactory.createQueueConnection();
                        }
                    }else {
                        throw new JMSException("Cannot create localConnection - no information");
                    }
                }else{
                    localQueueConnection = embeddedConnectionFactory.createQueueConnection();
                }
            }else {
                if(localUsername!=null){
                    localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
                                    localPassword);
                }else{
                    localQueueConnection=localQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        localQueueConnection.start();
    }
    
    protected void initializeInboundJmsMessageConvertor(){
    	inboundMessageConvertor.setConnection(localQueueConnection);
    }
    
    protected void initializeOutboundJmsMessageConvertor(){
    	outboundMessageConvertor.setConnection(outboundQueueConnection);
    }

    protected void initializeInboundQueueBridges() throws JMSException{
        if(inboundQueueBridges!=null){
            QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<inboundQueueBridges.length;i++){
                InboundQueueBridge bridge=inboundQueueBridges[i];
                String localQueueName=bridge.getLocalQueueName();
                Queue activemqQueue=createActiveMQQueue(localSession,localQueueName);
				
                String queueName = bridge.getInboundQueueName();
                Queue foreignQueue=createForeignQueue(outboundSession,queueName);
                bridge.setConsumerQueue(foreignQueue);
                bridge.setProducerQueue(activemqQueue);
                bridge.setProducerConnection(localQueueConnection);
                bridge.setConsumerConnection(outboundQueueConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addInboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected void initializeOutboundQueueBridges() throws JMSException{
        if(outboundQueueBridges!=null){
            QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<outboundQueueBridges.length;i++){
                OutboundQueueBridge bridge=outboundQueueBridges[i];
                String localQueueName=bridge.getLocalQueueName();              
                Queue activemqQueue=createActiveMQQueue(localSession,localQueueName);
                String queueName=bridge.getOutboundQueueName();
                Queue foreignQueue=createForeignQueue(outboundSession,queueName);
                bridge.setConsumerQueue(activemqQueue);
                bridge.setProducerQueue(foreignQueue);
                bridge.setProducerConnection(outboundQueueConnection);
                bridge.setConsumerConnection(localQueueConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addOutboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }
    
    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){        
    	Queue replyToProducerQueue =(Queue)destination;
    	boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
    	
    	if(isInbound){
    		InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue);
    		if (bridge == null){
    			bridge = new InboundQueueBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
    				Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
    				replyToConsumerSession.close();
    				bridge.setConsumerQueue(replyToConsumerQueue);
    				bridge.setProducerQueue(replyToProducerQueue);
    				bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getInboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerQueue);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerQueue, bridge);
    		}
    		return bridge.getConsumerQueue();
    	}else{
    		OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue);
    		if (bridge == null){
    			bridge = new OutboundQueueBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
    				Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
    				replyToConsumerSession.close();
    				bridge.setConsumerQueue(replyToConsumerQueue);
    				bridge.setProducerQueue(replyToProducerQueue);
    				bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerQueue);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerQueue, bridge);
    		}
    		return bridge.getConsumerQueue();
    	}		
    }
    
    protected Queue createActiveMQQueue(QueueSession session,String queueName) throws JMSException{
        return session.createQueue(queueName);
    }
    
    protected Queue createForeignQueue(QueueSession session,String queueName) throws JMSException{
        Queue result = null;
        try{
            result = session.createQueue(queueName);
        }catch(JMSException e){
            //look-up the Queue
            try{
                result = (Queue) jndiOutboundTemplate.lookup(queueName, Queue.class);
            }catch(NamingException e1){
                String errStr = "Failed to look-up Queue for name: " + queueName;
                log.error(errStr,e);
                JMSException jmsEx =  new JMSException(errStr);
                jmsEx.setLinkedException(e1);
                throw jmsEx;
            }
        }
        return result;
    }

    
}

> CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-1228
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1228
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.0 RC2, 4.0.1
>            Reporter: William MacDonald
>
> I'm using ActiveMQ (4.0.1) JMS to JMS Bridge functionality to connect to a  SunMQ JMS Broker (3.6 SP3  (Build 02-A)). I'm using two queues, an input and an output one, with the following configuration:
>     <jmsBridgeConnectors>
>       <jmsQueueConnector outboundQueueConnectionFactory="#REMOTE">
>       <outboundQueueBridges>
>         <outboundQueueBridge outboundQueueName="SUNRECV"/>
>       </outboundQueueBridges>
>       <inboundQueueBridges>
>         <inboundQueueBridge inboundQueueName="SUNSEND"/>
>       </inboundQueueBridges>
>       </jmsQueueConnector>
>     </jmsBridgeConnectors>
> The system works really well until the SunMQ broker needed to be restarted. This is what I found:
> 1.-ActiveMQ is not aware of the remote broker shutdown. I waited for a while, but no log on ActiveMQ indicates knowledge about the new situation.
> 2.-When I send a message to the output queue SUNRECV, ActiveMQ complains that the producer is closed:
> [ERROR][2006/08/25.09:47:12.039][ActiveMQ Session Task]failed to forward message: ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:trabucco-43457-1156491843149-3:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:trabucco-43457-1156491843149-3:4:1:1, destination = queue://SUNRECV, transactionId = null, expiration = 0, timestamp = 1156492032027, arrival = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 2, properties = null, readOnlyProperties = true, readOnlyBody = true, text = 1}([C4064]: Cannot perform operation, producer is closed.)
>  After this, it is automatically queueing messages without sending them, showing the log:
> [DEBUG][2006/08/25.09:47:42.721][RMI TCP Connection(4)-10.95.89.20]No subscriptions registered, will not dispatch message at this time.
>  Even if SunMQ is started again, ActiveMQ is not detecting the new situation, and continues queueing messages sent to SUNRECV.
> Please, make me know if more information is needed to understand the situation.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.