You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/03/04 23:42:04 UTC

svn commit: r750205 - /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java

Author: rajith
Date: Wed Mar  4 22:42:04 2009
New Revision: 750205

URL: http://svn.apache.org/viewvc?rev=750205&view=rev
Log:
This is related to QPID-1713
The automated test case for this is still being worked out.
However this feature has been tested manually.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=750205&r1=750204&r2=750205&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Wed Mar  4 22:42:04 2009
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.failover;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -52,7 +53,7 @@
  * from the list.   
  */
 
-public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener
+public class FailoverExchangeMethod implements FailoverMethod, MessageListener
 {
     private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
    
@@ -65,17 +66,29 @@
     /** The session used to subscribe to failover exchange */
     private Session _ssn;
     
-    private BrokerDetails _orginalBrokerDetail;
+    private BrokerDetails _originalBrokerDetail;
+    
+    /** The index into the hostDetails array of the broker to which we are connected */
+    private int _currentBrokerIndex = 0;
+    
+    /** The broker currently selected **/
+    private BrokerDetails _currentBrokerDetail;
+        
+    /** Array of BrokerDetail used to make connections. */
+    private ConnectionURL _connectionDetails;
+    
+    /** Denotes the number of failed attempts **/
+    private int _failedAttemps = 0;
     
     public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
     {
-        super(connectionDetails);        
-        _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0);
+        _connectionDetails = connectionDetails;
+        _originalBrokerDetail = _connectionDetails.getBrokerDetails(0);
         
         // This is not safe to use until attainConnection is called, as this ref will not initialized fully.
         // The reason being this constructor is called inside the AMWConnection constructor.
         // It would be best if we find a way to pass this ref after AMQConnection is fully initialized.
-        _conn = conn; 
+        _conn = conn;
     }
 
     private void subscribeForUpdates() throws JMSException
@@ -96,6 +109,17 @@
     public void onMessage(Message m)
     {
         _logger.info("Failover exchange notified cluster membership change");
+        
+        String currentBrokerIP = ""; 
+        try
+        {
+            currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress();
+        }
+        catch(Exception e)
+        {
+            _logger.warn("Unable to resolve current broker host name",e);
+        }
+        
         List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>();
         try
         {            
@@ -109,15 +133,22 @@
                 for (String url:urls)
                 {
                     String[] tokens = url.split(":");
-                    if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport()))
+                    if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport()))
                     {
                         BrokerDetails broker = new AMQBrokerDetails();
                         broker.setTransport(tokens[0]);
                         broker.setHost(tokens[1]);
                         broker.setPort(Integer.parseInt(tokens[2]));
-                        broker.setProperties(_orginalBrokerDetail.getProperties());
-                        broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration());
+                        broker.setProperties(_originalBrokerDetail.getProperties());
+                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
                         brokerList.add(broker);
+                        
+                        if (currentBrokerIP.equals(broker.getHost()) && 
+                            _currentBrokerDetail.getPort() == broker.getPort())
+                        {
+                            _currentBrokerIndex = brokerList.indexOf(broker);
+                        }
+                        
                         break;
                     }
                 }                
@@ -132,13 +163,20 @@
         {
             _connectionDetails.setBrokerDetails(brokerList);
         }
+        
+        _logger.info("============================================================");
+        _logger.info("Updated cluster membership details " + _connectionDetails);
+        _logger.info("============================================================");
     }
     
     public void attainedConnection()
     {
-        super.attainedConnection();
         try
         {
+            _failedAttemps = 0;
+            _logger.info("============================================================");
+            _logger.info("Attained connection ");
+            _logger.info("============================================================");
             subscribeForUpdates();
         }
         catch (JMSException e)
@@ -151,17 +189,92 @@
     {
         synchronized (_brokerListLock)
         {
-            return super.getCurrentBrokerDetails();
+            return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
         }
-    }
-
+    }   
+    
     public BrokerDetails getNextBrokerDetails()
     {
         synchronized(_brokerListLock)
         {
-            return super.getNextBrokerDetails();
+            if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
+            {
+                _currentBrokerIndex = 0;
+            }
+            else
+            {
+                _currentBrokerIndex++;
+            }
+            
+            BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+            
+            // When the broker list is updated it will include the current broker as well
+            // There is no point trying it again, so trying the next one.
+            if (_currentBrokerDetail != null &&
+                broker.getHost().equals(_currentBrokerDetail.getHost()) &&
+                broker.getPort() == _currentBrokerDetail.getPort())
+            {
+                return getNextBrokerDetails();
+            }
+
+            String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+            if (delayStr != null)
+            {
+                Long delay = Long.parseLong(delayStr);
+                _logger.info("Delay between connect retries:" + delay);
+                try
+                {
+                    Thread.sleep(delay);
+                }
+                catch (InterruptedException ie)
+                {
+                    return null;
+                }
+            }
+            else
+            {
+                _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+            }
+
+            _failedAttemps ++;
+            _currentBrokerDetail = broker;
+            return broker;            
         }
     }
+    
+    public boolean failoverAllowed()
+    {
+        // We allow to Failover provided 
+        // our broker list is not empty and
+        // we haven't gone through all of them  
+               
+        boolean b = _connectionDetails.getBrokerCount() > 0 &&
+               _failedAttemps <= _connectionDetails.getBrokerCount();
+        
+        
+        _logger.info("============================================================");
+        _logger.info(toString());
+        _logger.info("FailoverAllowed " + b);
+        _logger.info("============================================================");
+        
+        return b;
+    }
+    
+    public void reset()
+    {
+        _failedAttemps = 0;
+    }
+    
+    public void setBroker(BrokerDetails broker)
+    {
+        // not sure if this method is needed
+    }
+
+    public void setRetries(int maxRetries)
+    {
+        // no max retries we keep trying as long
+        // as we get updates
+    }
 
     public String methodName()
     {
@@ -172,7 +285,24 @@
     {
         StringBuffer sb = new StringBuffer();
         sb.append("FailoverExchange:\n");
-        sb.append(super.toString());
+        sb.append("\n Current Broker Index:");
+        sb.append(_currentBrokerIndex);
+        sb.append("\n Failed Attempts:");
+        sb.append(_failedAttemps);
+        sb.append("\n Orignal broker details:");
+        sb.append(_originalBrokerDetail).append("\n");
+        sb.append("\n -------- Broker List -----------\n");
+        for (int i = 0; i < _connectionDetails.getBrokerCount(); i++)
+        {
+            if (i == _currentBrokerIndex)
+            {
+                sb.append(">");
+            }
+
+            sb.append(_connectionDetails.getBrokerDetails(i));
+            sb.append("\n");
+        }
+        sb.append("--------------------------------\n");
         return sb.toString();
     }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org