You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "William MacDonald (JIRA)" <ji...@apache.org> on 2007/04/12 16:04:34 UTC

[jira] Created: (AMQ-1228) CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.

CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.
------------------------------------------------------------------------------------------------------

                 Key: AMQ-1228
                 URL: https://issues.apache.org/activemq/browse/AMQ-1228
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 4.0 RC2, 4.0.1
            Reporter: William MacDonald



I'm using ActiveMQ (4.0.1) JMS to JMS Bridge functionality to connect to a  SunMQ JMS Broker (3.6 SP3  (Build 02-A)). I'm using two queues, an input and an output one, with the following configuration:


    <jmsBridgeConnectors>
      <jmsQueueConnector outboundQueueConnectionFactory="#REMOTE">
      <outboundQueueBridges>
        <outboundQueueBridge outboundQueueName="SUNRECV"/>
      </outboundQueueBridges>
      <inboundQueueBridges>
        <inboundQueueBridge inboundQueueName="SUNSEND"/>
      </inboundQueueBridges>
      </jmsQueueConnector>
    </jmsBridgeConnectors>


The system works really well until the SunMQ broker needed to be restarted. This is what I found:
1.-ActiveMQ is not aware of the remote broker shutdown. I waited for a while, but no log on ActiveMQ indicates knowledge about the new situation.
2.-When I send a message to the output queue SUNRECV, ActiveMQ complains that the producer is closed:

[ERROR][2006/08/25.09:47:12.039][ActiveMQ Session Task]failed to forward message: ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:trabucco-43457-1156491843149-3:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:trabucco-43457-1156491843149-3:4:1:1, destination = queue://SUNRECV, transactionId = null, expiration = 0, timestamp = 1156492032027, arrival = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 2, properties = null, readOnlyProperties = true, readOnlyBody = true, text = 1}([C4064]: Cannot perform operation, producer is closed.)

 After this, it is automatically queueing messages without sending them, showing the log:

[DEBUG][2006/08/25.09:47:42.721][RMI TCP Connection(4)-10.95.89.20]No subscriptions registered, will not dispatch message at this time.

 Even if SunMQ is started again, ActiveMQ is not detecting the new situation, and continues queueing messages sent to SUNRECV.

Please, make me know if more information is needed to understand the situation.


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Closed: (AMQ-1228) CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.

Posted by "William MacDonald (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/activemq/browse/AMQ-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

William MacDonald closed AMQ-1228.
----------------------------------

    Resolution: Duplicate

> CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-1228
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1228
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.0 RC2, 4.0.1
>            Reporter: William MacDonald
>
> I'm using ActiveMQ (4.0.1) JMS to JMS Bridge functionality to connect to a  SunMQ JMS Broker (3.6 SP3  (Build 02-A)). I'm using two queues, an input and an output one, with the following configuration:
>     <jmsBridgeConnectors>
>       <jmsQueueConnector outboundQueueConnectionFactory="#REMOTE">
>       <outboundQueueBridges>
>         <outboundQueueBridge outboundQueueName="SUNRECV"/>
>       </outboundQueueBridges>
>       <inboundQueueBridges>
>         <inboundQueueBridge inboundQueueName="SUNSEND"/>
>       </inboundQueueBridges>
>       </jmsQueueConnector>
>     </jmsBridgeConnectors>
> The system works really well until the SunMQ broker needed to be restarted. This is what I found:
> 1.-ActiveMQ is not aware of the remote broker shutdown. I waited for a while, but no log on ActiveMQ indicates knowledge about the new situation.
> 2.-When I send a message to the output queue SUNRECV, ActiveMQ complains that the producer is closed:
> [ERROR][2006/08/25.09:47:12.039][ActiveMQ Session Task]failed to forward message: ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:trabucco-43457-1156491843149-3:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:trabucco-43457-1156491843149-3:4:1:1, destination = queue://SUNRECV, transactionId = null, expiration = 0, timestamp = 1156492032027, arrival = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 2, properties = null, readOnlyProperties = true, readOnlyBody = true, text = 1}([C4064]: Cannot perform operation, producer is closed.)
>  After this, it is automatically queueing messages without sending them, showing the log:
> [DEBUG][2006/08/25.09:47:42.721][RMI TCP Connection(4)-10.95.89.20]No subscriptions registered, will not dispatch message at this time.
>  Even if SunMQ is started again, ActiveMQ is not detecting the new situation, and continues queueing messages sent to SUNRECV.
> Please, make me know if more information is needed to understand the situation.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (AMQ-1228) CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.

Posted by "Sunny Liu (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/activemq/browse/AMQ-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40117 ] 

Sunny Liu commented on AMQ-1228:
--------------------------------

I have experience same problem with openJMS connection. However, I have fixed it in org.apache.activemq.network.jms.JmsQueueConnector and org.apache.activemq.network.jms.JmsTopicConnector. I have tested new change it work fine for me.

Here is code.

/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Bridge to other JMS Topic providers
 * 
 * @org.apache.xbean.XBean
 * 
 * @version $Revision: 1.1.1.1 $
 */
public class JmsTopicConnector extends JmsConnector
implements ExceptionListener
{
    private static final Log log=LogFactory.getLog(JmsTopicConnector.class);
    private String outboundTopicConnectionFactoryName;
    private String localConnectionFactoryName;
    private TopicConnectionFactory outboundTopicConnectionFactory;
    private TopicConnectionFactory localTopicConnectionFactory;
    private TopicConnection outboundTopicConnection;
    private TopicConnection localTopicConnection;
    private InboundTopicBridge[] inboundTopicBridges;
    private OutboundTopicBridge[] outboundTopicBridges;
    
    public boolean init(){
        boolean result=super.init();
        if(result){
            try{
                initializeForeignTopicConnection();
                initializeLocalTopicConnection();
                initializeInboundJmsMessageConvertor();
                initializeOutboundJmsMessageConvertor();
                initializeInboundTopicBridges();
                initializeOutboundTopicBridges();
            }catch(Exception e){
                log.error("Failed to initialize the JMSConnector",e);
            }
        }
        return result;
    }   
    

    protected boolean reInit()
    {
    	boolean ret = false;
    	try{            
    		if(outboundTopicConnectionFactoryName!=null){
            	this.outboundTopicConnection = null;
            	this.outboundTopicConnectionFactory = null;
            }
    		initializeForeignTopicConnection();
            initializeLocalTopicConnection();
            initializeInboundJmsMessageConvertor();
            initializeOutboundJmsMessageConvertor();
            initializeInboundTopicBridges();
            initializeOutboundTopicBridges();
            ret = true;
        }catch(Exception e){
            ret = false;
        	log.error("Failed to initialize the JMSConnector",e);
        }
        return ret;
    }
    
    public void onException(JMSException jmsException)
    {
    	if(started.get()) started.compareAndSet(true, false);
    	boolean  initSuccess = false;
    	do{
    		initSuccess = reInit();
			if(!initSuccess){
				log.warn("Still not able to connect to foreign server, wait another 5 second and try again.");
				try {				
	        		Thread.sleep(5000);
				} catch (Exception e) {					
					;
				}				
			}else{				
				log.warn("reconnect to foreign server successfully.");	
				try {
					this.start();
				} catch (Exception e) {
					initSuccess = false;
					log.warn("Failed to restart.", e);
				}
			}
		}while(!initSuccess);
		
	}
    
    /**
     * @return Returns the inboundTopicBridges.
     */
    public InboundTopicBridge[] getInboundTopicBridges(){
        return inboundTopicBridges;
    }

    /**
     * @param inboundTopicBridges
     *            The inboundTopicBridges to set.
     */
    public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges){
        this.inboundTopicBridges=inboundTopicBridges;
    }

    /**
     * @return Returns the outboundTopicBridges.
     */
    public OutboundTopicBridge[] getOutboundTopicBridges(){
        return outboundTopicBridges;
    }

    /**
     * @param outboundTopicBridges
     *            The outboundTopicBridges to set.
     */
    public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges){
        this.outboundTopicBridges=outboundTopicBridges;
    }

    /**
     * @return Returns the localTopicConnectionFactory.
     */
    public TopicConnectionFactory getLocalTopicConnectionFactory(){
        return localTopicConnectionFactory;
    }

    /**
     * @param localTopicConnectionFactory
     *            The localTopicConnectionFactory to set.
     */
    public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory){
        this.localTopicConnectionFactory=localConnectionFactory;
    }

    /**
     * @return Returns the outboundTopicConnectionFactory.
     */
    public TopicConnectionFactory getOutboundTopicConnectionFactory(){
        return outboundTopicConnectionFactory;
    }

    /**
     * @return Returns the outboundTopicConnectionFactoryName.
     */
    public String getOutboundTopicConnectionFactoryName(){
        return outboundTopicConnectionFactoryName;
    }

    /**
     * @param outboundTopicConnectionFactoryName
     *            The outboundTopicConnectionFactoryName to set.
     */
    public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName){
        this.outboundTopicConnectionFactoryName=foreignTopicConnectionFactoryName;
    }

    /**
     * @return Returns the localConnectionFactoryName.
     */
    public String getLocalConnectionFactoryName(){
        return localConnectionFactoryName;
    }

    /**
     * @param localConnectionFactoryName
     *            The localConnectionFactoryName to set.
     */
    public void setLocalConnectionFactoryName(String localConnectionFactoryName){
        this.localConnectionFactoryName=localConnectionFactoryName;
    }

    /**
     * @return Returns the localTopicConnection.
     */
    public TopicConnection getLocalTopicConnection(){
        return localTopicConnection;
    }

    /**
     * @param localTopicConnection
     *            The localTopicConnection to set.
     */
    public void setLocalTopicConnection(TopicConnection localTopicConnection){
        this.localTopicConnection=localTopicConnection;
    }

    /**
     * @return Returns the outboundTopicConnection.
     */
    public TopicConnection getOutboundTopicConnection(){
        return outboundTopicConnection;
    }

    /**
     * @param outboundTopicConnection
     *            The outboundTopicConnection to set.
     */
    public void setOutboundTopicConnection(TopicConnection foreignTopicConnection){
        this.outboundTopicConnection=foreignTopicConnection;
    }

    /**
     * @param outboundTopicConnectionFactory
     *            The outboundTopicConnectionFactory to set.
     */
    public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory){
        this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
    }


    public void restartProducerConnection() throws NamingException, JMSException {
        outboundTopicConnection = null;
        initializeForeignTopicConnection();
    }

    protected void initializeForeignTopicConnection() throws NamingException,JMSException{
        if(outboundTopicConnection==null){
            // get the connection factories
            if(outboundTopicConnectionFactory==null){
                // look it up from JNDI
                if(outboundTopicConnectionFactoryName!=null){
                    outboundTopicConnectionFactory=(TopicConnectionFactory) jndiOutboundTemplate.lookup(
                                    outboundTopicConnectionFactoryName,TopicConnectionFactory.class);
                    if(outboundUsername!=null){
                        outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
                                        outboundPassword);
                    }else{
                        outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
                    }
                }else {
                    throw new JMSException("Cannot create localConnection - no information");
                }
            }else {
                if(outboundUsername!=null){
                    outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
                                    outboundPassword);
                }else{
                    outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
                }
            }
        }
        outboundTopicConnection.start();
    }

    protected void initializeLocalTopicConnection() throws NamingException,JMSException{
        if(localTopicConnection==null){
            // get the connection factories
            if(localTopicConnectionFactory==null){
                if(embeddedConnectionFactory==null){
                    // look it up from JNDI
                    if(localConnectionFactoryName!=null){
                        localTopicConnectionFactory=(TopicConnectionFactory) jndiLocalTemplate.lookup(
                                        localConnectionFactoryName,TopicConnectionFactory.class);
                        if(localUsername!=null){
                            localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
                                            localPassword);
                        }else{
                            localTopicConnection=localTopicConnectionFactory.createTopicConnection();
                        }
                    }else {
                        throw new JMSException("Cannot create localConnection - no information");
                    }
                }else{
                    localTopicConnection = embeddedConnectionFactory.createTopicConnection();
                }
            }else {
                if(localUsername!=null){
                    localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
                                    localPassword);
                }else{
                    localTopicConnection=localTopicConnectionFactory.createTopicConnection();
                }
            }
        }
        localTopicConnection.start();
    }
    
    protected void initializeInboundJmsMessageConvertor(){
    	inboundMessageConvertor.setConnection(localTopicConnection);
    }
    
    protected void initializeOutboundJmsMessageConvertor(){
    	outboundMessageConvertor.setConnection(outboundTopicConnection);
    }

    protected void initializeInboundTopicBridges() throws JMSException{
        if(inboundTopicBridges!=null){
            TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<inboundTopicBridges.length;i++){
                InboundTopicBridge bridge=inboundTopicBridges[i];
                String localTopicName=bridge.getLocalTopicName();          
                Topic activemqTopic=createActiveMQTopic(localSession,localTopicName);
                String topicName=bridge.getInboundTopicName();
                Topic foreignTopic=createForeignTopic(outboundSession,topicName);
                bridge.setConsumerTopic(foreignTopic);
                bridge.setProducerTopic(activemqTopic);
                bridge.setProducerConnection(localTopicConnection);
                bridge.setConsumerConnection(outboundTopicConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addInboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected void initializeOutboundTopicBridges() throws JMSException{
        if(outboundTopicBridges!=null){
            TopicSession outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<outboundTopicBridges.length;i++){
                OutboundTopicBridge bridge=outboundTopicBridges[i];
                String localTopicName=bridge.getLocalTopicName();
                Topic activemqTopic=createActiveMQTopic(localSession,localTopicName);
                String topicName=bridge.getOutboundTopicName();
                Topic foreignTopic=createForeignTopic(outboundSession,topicName);
                bridge.setConsumerTopic(activemqTopic);
                bridge.setProducerTopic(foreignTopic);
                bridge.setProducerConnection(outboundTopicConnection);
                bridge.setConsumerConnection(localTopicConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addOutboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }
    
    protected  Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){
    	Topic replyToProducerTopic =(Topic)destination;
    	boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
    	
    	if(isInbound){
    		InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic);
    		if (bridge == null){
    			bridge = new InboundTopicBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
    				Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
    				replyToConsumerSession.close();
    				bridge.setConsumerTopic(replyToConsumerTopic);
    				bridge.setProducerTopic(replyToProducerTopic);
    				bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getInboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerTopic);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerTopic, bridge);
    		}
    		return bridge.getConsumerTopic();
    	}else{
    		OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic);
    		if (bridge == null){
    			bridge = new OutboundTopicBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
    				Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
    				replyToConsumerSession.close();
    				bridge.setConsumerTopic(replyToConsumerTopic);
    				bridge.setProducerTopic(replyToProducerTopic);
    				bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerTopic);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerTopic, bridge);
    		}
    		return bridge.getConsumerTopic();
    	}		
    }
    
    protected Topic createActiveMQTopic(TopicSession session,String topicName) throws JMSException{
        return session.createTopic(topicName);
    }
    
    protected Topic createForeignTopic(TopicSession session,String topicName) throws JMSException{
        Topic result = null;
        try{
            result = session.createTopic(topicName);
        }catch(JMSException e){
            //look-up the Topic
            try{
                result = (Topic) jndiOutboundTemplate.lookup(topicName, Topic.class);
            }catch(NamingException e1){
                String errStr = "Failed to look-up Topic for name: " + topicName;
                log.error(errStr,e);
                JMSException jmsEx =  new JMSException(errStr);
                jmsEx.setLinkedException(e1);
                throw jmsEx;
            }
        }
        return result;
    }

    
}


///======================================================


/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
/**
 * A Bridge to other JMS Queue providers
 * 
 * @org.apache.xbean.XBean
 *
 * @version $Revision: 1.1.1.1 $
 */
public class JmsQueueConnector extends JmsConnector
implements ExceptionListener
{
    private static final Log log=LogFactory.getLog(JmsQueueConnector.class);
    private String outboundQueueConnectionFactoryName;
    private String localConnectionFactoryName;
    
    private QueueConnectionFactory outboundQueueConnectionFactory;
    private QueueConnectionFactory localQueueConnectionFactory;
    
    private QueueConnection outboundQueueConnection;
    private QueueConnection localQueueConnection;
    
    private InboundQueueBridge[] inboundQueueBridges;
    private OutboundQueueBridge[] outboundQueueBridges;   

    public boolean init(){
        boolean result=super.init();
        if(result){
            try{
                initializeForeignQueueConnection();
                initializeLocalQueueConnection();
                initializeInboundJmsMessageConvertor();
                initializeOutboundJmsMessageConvertor();
                initializeInboundQueueBridges();
                initializeOutboundQueueBridges();
            }catch(Exception e){                
            	log.error("Failed to initialize the JMSConnector",e);
            }
        }
        return result;
    }   
    
    protected boolean reInit()
    {
    	boolean ret = false;
    	try{            
    		if(outboundQueueConnectionFactoryName!=null){
            	this.outboundQueueConnection = null;
            	this.outboundQueueConnectionFactory = null;
            }
            initializeForeignQueueConnection();
            initializeLocalQueueConnection();
            initializeInboundJmsMessageConvertor();
            initializeOutboundJmsMessageConvertor();
            initializeInboundQueueBridges();
            initializeOutboundQueueBridges();
            ret = true;
        }catch(Exception e){
            ret = false;
        	log.error("Failed to initialize the JMSConnector",e);
        }
        return ret;
    }
    
    public void onException(JMSException jmsException)
    {
    	if(started.get()) started.compareAndSet(true, false);
    	boolean  initSuccess = false;
    	do{
    		initSuccess = reInit();
			if(!initSuccess){
				log.warn("Still not able to connect to foreign server, wait another 5 second and try again.");
				try {				
	        		Thread.sleep(5000);
				} catch (Exception e) {					
					;
				}				
			}else{				
				log.warn("reconnect to foreign server successfully.");	
				try {
					this.start();
				} catch (Exception e) {
					initSuccess = false;
					log.warn("Failed to restart.", e);
				}
			}
		}while(!initSuccess);
		
	}
    
    /**
     * @return Returns the inboundQueueBridges.
     */
    public InboundQueueBridge[] getInboundQueueBridges(){
        return inboundQueueBridges;
    }

    /**
     * @param inboundQueueBridges
     *            The inboundQueueBridges to set.
     */
    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges){
        this.inboundQueueBridges=inboundQueueBridges;
    }

    /**
     * @return Returns the outboundQueueBridges.
     */
    public OutboundQueueBridge[] getOutboundQueueBridges(){
        return outboundQueueBridges;
    }

    /**
     * @param outboundQueueBridges
     *            The outboundQueueBridges to set.
     */
    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges){
        this.outboundQueueBridges=outboundQueueBridges;
    }

    /**
     * @return Returns the localQueueConnectionFactory.
     */
    public QueueConnectionFactory getLocalQueueConnectionFactory(){
        return localQueueConnectionFactory;
    }

    /**
     * @param localQueueConnectionFactory
     *            The localQueueConnectionFactory to set.
     */
    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory){
        this.localQueueConnectionFactory=localConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactory.
     */
    public QueueConnectionFactory getOutboundQueueConnectionFactory(){
        return outboundQueueConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactoryName.
     */
    public String getOutboundQueueConnectionFactoryName(){
        return outboundQueueConnectionFactoryName;
    }

    /**
     * @param outboundQueueConnectionFactoryName
     *            The outboundQueueConnectionFactoryName to set.
     */
    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName){
        this.outboundQueueConnectionFactoryName=foreignQueueConnectionFactoryName;
    }

    /**
     * @return Returns the localConnectionFactoryName.
     */
    public String getLocalConnectionFactoryName(){
        return localConnectionFactoryName;
    }

    /**
     * @param localConnectionFactoryName
     *            The localConnectionFactoryName to set.
     */
    public void setLocalConnectionFactoryName(String localConnectionFactoryName){
        this.localConnectionFactoryName=localConnectionFactoryName;
    }

    /**
     * @return Returns the localQueueConnection.
     */
    public QueueConnection getLocalQueueConnection(){
        return localQueueConnection;
    }

    /**
     * @param localQueueConnection
     *            The localQueueConnection to set.
     */
    public void setLocalQueueConnection(QueueConnection localQueueConnection){
        this.localQueueConnection=localQueueConnection;
    }

    /**
     * @return Returns the outboundQueueConnection.
     */
    public QueueConnection getOutboundQueueConnection(){
        return outboundQueueConnection;
    }

    /**
     * @param outboundQueueConnection
     *            The outboundQueueConnection to set.
     */
    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection){
        this.outboundQueueConnection=foreignQueueConnection;
    }

    /**
     * @param outboundQueueConnectionFactory
     *            The outboundQueueConnectionFactory to set.
     */
    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory){
        this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
    }

    public void restartProducerConnection() throws NamingException, JMSException {
        outboundQueueConnection = null;
        initializeForeignQueueConnection();
        
    }

    protected void initializeForeignQueueConnection() throws NamingException,JMSException{
        
    	if(outboundQueueConnection==null){
            // get the connection factories
            if(outboundQueueConnectionFactory==null){
                // look it up from JNDI
                if(outboundQueueConnectionFactoryName!=null){
                    outboundQueueConnectionFactory=(QueueConnectionFactory) jndiOutboundTemplate.lookup(
                                    outboundQueueConnectionFactoryName,QueueConnectionFactory.class);
                    if(outboundUsername!=null){
                        outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
                                        outboundPassword);
                    }else{
                        outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
                    }
                }else {
                    throw new JMSException("Cannot create localConnection - no information");
                }
            }else {
                if(outboundUsername!=null){
                    outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
                                    outboundPassword);
                }else{
                    outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        outboundQueueConnection.start();
        outboundQueueConnection.setExceptionListener(this);
    }

    protected void initializeLocalQueueConnection() throws NamingException,JMSException{
        if(localQueueConnection==null){
            // get the connection factories
            if(localQueueConnectionFactory==null){
                if(embeddedConnectionFactory==null){
                    // look it up from JNDI
                    if(localConnectionFactoryName!=null){
                        localQueueConnectionFactory=(QueueConnectionFactory) jndiLocalTemplate.lookup(
                                        localConnectionFactoryName,QueueConnectionFactory.class);
                        if(localUsername!=null){
                            localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
                                            localPassword);
                        }else{
                            localQueueConnection=localQueueConnectionFactory.createQueueConnection();
                        }
                    }else {
                        throw new JMSException("Cannot create localConnection - no information");
                    }
                }else{
                    localQueueConnection = embeddedConnectionFactory.createQueueConnection();
                }
            }else {
                if(localUsername!=null){
                    localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
                                    localPassword);
                }else{
                    localQueueConnection=localQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        localQueueConnection.start();
    }
    
    protected void initializeInboundJmsMessageConvertor(){
    	inboundMessageConvertor.setConnection(localQueueConnection);
    }
    
    protected void initializeOutboundJmsMessageConvertor(){
    	outboundMessageConvertor.setConnection(outboundQueueConnection);
    }

    protected void initializeInboundQueueBridges() throws JMSException{
        if(inboundQueueBridges!=null){
            QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<inboundQueueBridges.length;i++){
                InboundQueueBridge bridge=inboundQueueBridges[i];
                String localQueueName=bridge.getLocalQueueName();
                Queue activemqQueue=createActiveMQQueue(localSession,localQueueName);
				
                String queueName = bridge.getInboundQueueName();
                Queue foreignQueue=createForeignQueue(outboundSession,queueName);
                bridge.setConsumerQueue(foreignQueue);
                bridge.setProducerQueue(activemqQueue);
                bridge.setProducerConnection(localQueueConnection);
                bridge.setConsumerConnection(outboundQueueConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addInboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected void initializeOutboundQueueBridges() throws JMSException{
        if(outboundQueueBridges!=null){
            QueueSession outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<outboundQueueBridges.length;i++){
                OutboundQueueBridge bridge=outboundQueueBridges[i];
                String localQueueName=bridge.getLocalQueueName();              
                Queue activemqQueue=createActiveMQQueue(localSession,localQueueName);
                String queueName=bridge.getOutboundQueueName();
                Queue foreignQueue=createForeignQueue(outboundSession,queueName);
                bridge.setConsumerQueue(activemqQueue);
                bridge.setProducerQueue(foreignQueue);
                bridge.setProducerConnection(outboundQueueConnection);
                bridge.setConsumerConnection(localQueueConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addOutboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }
    
    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){        
    	Queue replyToProducerQueue =(Queue)destination;
    	boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
    	
    	if(isInbound){
    		InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue);
    		if (bridge == null){
    			bridge = new InboundQueueBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
    				Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
    				replyToConsumerSession.close();
    				bridge.setConsumerQueue(replyToConsumerQueue);
    				bridge.setProducerQueue(replyToProducerQueue);
    				bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getInboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerQueue);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerQueue, bridge);
    		}
    		return bridge.getConsumerQueue();
    	}else{
    		OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue);
    		if (bridge == null){
    			bridge = new OutboundQueueBridge(){
    				protected Destination processReplyToDestination (Destination destination){
    					return null;
    				}
    			};
    			try{
    				QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
    				Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
    				replyToConsumerSession.close();
    				bridge.setConsumerQueue(replyToConsumerQueue);
    				bridge.setProducerQueue(replyToProducerQueue);
    				bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
    				bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
    				bridge.setDoHandleReplyTo(false);
    				if(bridge.getJmsMessageConvertor()==null){
    					bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
    				}
    				bridge.setJmsConnector(this);
    				bridge.start();
    				log.info("Created replyTo bridge for " + replyToProducerQueue);
    			}catch(Exception e){
    				log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
    				return null;
    			}
    			replyToBridges.put(replyToProducerQueue, bridge);
    		}
    		return bridge.getConsumerQueue();
    	}		
    }
    
    protected Queue createActiveMQQueue(QueueSession session,String queueName) throws JMSException{
        return session.createQueue(queueName);
    }
    
    protected Queue createForeignQueue(QueueSession session,String queueName) throws JMSException{
        Queue result = null;
        try{
            result = session.createQueue(queueName);
        }catch(JMSException e){
            //look-up the Queue
            try{
                result = (Queue) jndiOutboundTemplate.lookup(queueName, Queue.class);
            }catch(NamingException e1){
                String errStr = "Failed to look-up Queue for name: " + queueName;
                log.error(errStr,e);
                JMSException jmsEx =  new JMSException(errStr);
                jmsEx.setLinkedException(e1);
                throw jmsEx;
            }
        }
        return result;
    }

    
}

> CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-1228
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1228
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.0 RC2, 4.0.1
>            Reporter: William MacDonald
>
> I'm using ActiveMQ (4.0.1) JMS to JMS Bridge functionality to connect to a  SunMQ JMS Broker (3.6 SP3  (Build 02-A)). I'm using two queues, an input and an output one, with the following configuration:
>     <jmsBridgeConnectors>
>       <jmsQueueConnector outboundQueueConnectionFactory="#REMOTE">
>       <outboundQueueBridges>
>         <outboundQueueBridge outboundQueueName="SUNRECV"/>
>       </outboundQueueBridges>
>       <inboundQueueBridges>
>         <inboundQueueBridge inboundQueueName="SUNSEND"/>
>       </inboundQueueBridges>
>       </jmsQueueConnector>
>     </jmsBridgeConnectors>
> The system works really well until the SunMQ broker needed to be restarted. This is what I found:
> 1.-ActiveMQ is not aware of the remote broker shutdown. I waited for a while, but no log on ActiveMQ indicates knowledge about the new situation.
> 2.-When I send a message to the output queue SUNRECV, ActiveMQ complains that the producer is closed:
> [ERROR][2006/08/25.09:47:12.039][ActiveMQ Session Task]failed to forward message: ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:trabucco-43457-1156491843149-3:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:trabucco-43457-1156491843149-3:4:1:1, destination = queue://SUNRECV, transactionId = null, expiration = 0, timestamp = 1156492032027, arrival = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 2, properties = null, readOnlyProperties = true, readOnlyBody = true, text = 1}([C4064]: Cannot perform operation, producer is closed.)
>  After this, it is automatically queueing messages without sending them, showing the log:
> [DEBUG][2006/08/25.09:47:42.721][RMI TCP Connection(4)-10.95.89.20]No subscriptions registered, will not dispatch message at this time.
>  Even if SunMQ is started again, ActiveMQ is not detecting the new situation, and continues queueing messages sent to SUNRECV.
> Please, make me know if more information is needed to understand the situation.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (AMQ-1228) CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.

Posted by "William MacDonald (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/activemq/browse/AMQ-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_38996 ] 

William MacDonald commented on AMQ-1228:
----------------------------------------

I have come across this issue in an attempt to use the JMS to JMS bridge in version 4.1.1. 
I am trying to bridge ActiveMQ to Websphere MQ and I am running the bridge in its own process and everything connects and works properly.

The remote Websphere MQ broker is being shutdown every night for backups and when this occurs the bridge does not see the disconnect of the remote broker.

I performed a netstat of the socket connections to see if the bridge was still attempting a connection and found that the socket connections to the remote broker are in a CLOSE_WAIT state.

If you have any ideas or wish to try a test I am willing to do what every is needed to resolve this issue.

Thanks,

William


> CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and connections are not closed.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-1228
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1228
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.0 RC2, 4.0.1
>            Reporter: William MacDonald
>
> I'm using ActiveMQ (4.0.1) JMS to JMS Bridge functionality to connect to a  SunMQ JMS Broker (3.6 SP3  (Build 02-A)). I'm using two queues, an input and an output one, with the following configuration:
>     <jmsBridgeConnectors>
>       <jmsQueueConnector outboundQueueConnectionFactory="#REMOTE">
>       <outboundQueueBridges>
>         <outboundQueueBridge outboundQueueName="SUNRECV"/>
>       </outboundQueueBridges>
>       <inboundQueueBridges>
>         <inboundQueueBridge inboundQueueName="SUNSEND"/>
>       </inboundQueueBridges>
>       </jmsQueueConnector>
>     </jmsBridgeConnectors>
> The system works really well until the SunMQ broker needed to be restarted. This is what I found:
> 1.-ActiveMQ is not aware of the remote broker shutdown. I waited for a while, but no log on ActiveMQ indicates knowledge about the new situation.
> 2.-When I send a message to the output queue SUNRECV, ActiveMQ complains that the producer is closed:
> [ERROR][2006/08/25.09:47:12.039][ActiveMQ Session Task]failed to forward message: ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:trabucco-43457-1156491843149-3:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:trabucco-43457-1156491843149-3:4:1:1, destination = queue://SUNRECV, transactionId = null, expiration = 0, timestamp = 1156492032027, arrival = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 2, properties = null, readOnlyProperties = true, readOnlyBody = true, text = 1}([C4064]: Cannot perform operation, producer is closed.)
>  After this, it is automatically queueing messages without sending them, showing the log:
> [DEBUG][2006/08/25.09:47:42.721][RMI TCP Connection(4)-10.95.89.20]No subscriptions registered, will not dispatch message at this time.
>  Even if SunMQ is started again, ActiveMQ is not detecting the new situation, and continues queueing messages sent to SUNRECV.
> Please, make me know if more information is needed to understand the situation.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.