You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/04 07:49:20 UTC

svn commit: r960296 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/FailoverTransport.java test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java

Author: rajdavies
Date: Sun Jul  4 05:49:20 2010
New Revision: 960296

URL: http://svn.apache.org/viewvc?rev=960296&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2807

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=960296&r1=960295&r2=960296&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Sun Jul  4 05:49:20 2010
@@ -17,10 +17,16 @@
 
 package org.apache.activemq.transport.failover;
 
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.InterruptedIOException;
 import java.net.InetAddress;
+import java.net.MalformedURLException;
 import java.net.URI;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,7 +37,6 @@ import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -113,6 +118,8 @@ public class FailoverTransport implement
     private boolean reconnectSupported=true;
     // remember for reconnect thread
     private SslContext brokerSslContext;
+    private String updateURIsURL = null;
+    private boolean rebalanceUpdateURIs=true;
 
     public FailoverTransport() throws InterruptedIOException {
         brokerSslContext = SslContext.getCurrentSslContext();
@@ -257,12 +264,15 @@ public class FailoverTransport implement
                 }
             }
         }
-        String connectedStr = control.getConnectedBrokers();
-        if (connectedStr != null) {
-            connectedStr = connectedStr.trim();
-            if (connectedStr.length() > 0 && isUpdateURIsSupported()) {
+        processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
+    }
+
+    private final void processNewTransports(boolean rebalance, String newTransports) {
+        if (newTransports != null) {
+            newTransports = newTransports.trim();
+            if (newTransports.length() > 0 && isUpdateURIsSupported()) {
                 List<URI> list = new ArrayList<URI>();
-                StringTokenizer tokenizer = new StringTokenizer(connectedStr, ",");
+                StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
                 while (tokenizer.hasMoreTokens()) {
                     String str = tokenizer.nextToken();
                     try {
@@ -274,9 +284,9 @@ public class FailoverTransport implement
                 }
                 if (list.isEmpty() == false) {
                     try {
-                        updateURIs(control.isRebalanceConnection(), list.toArray(new URI[list.size()]));
+                        updateURIs(rebalance, list.toArray(new URI[list.size()]));
                     } catch (IOException e) {
-                        LOG.error("Failed to update transport URI's from: " + connectedStr, e);
+                        LOG.error("Failed to update transport URI's from: " + newTransports, e);
                     }
                 }
 
@@ -752,6 +762,40 @@ public class FailoverTransport implement
         Exception failure = null;
         synchronized (reconnectMutex) {
 
+            // If updateURIsURL is specified, read the file and add any new
+            // transport URI's to this FailOverTransport. 
+            // Note: Could track file timestamp to avoid unnecessary reading.
+            String fileURL = getUpdateURIsURL();
+            if (fileURL != null) {
+                BufferedReader in = null;
+                String newUris = null;
+                StringBuffer buffer = new StringBuffer();
+
+                try {
+                    in = new BufferedReader(getURLStream(fileURL));
+                    while (true) {
+                        String line = in.readLine();
+                        if (line == null) {
+                            break;
+                        }
+                        buffer.append(line);
+                    }
+                    newUris = buffer.toString();
+                } catch (IOException ioe) {
+                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
+                } finally {
+                    if (in != null) {
+                        try {
+                            in.close();
+                        } catch (IOException ioe) {
+                            // ignore
+                        }
+                    }
+                }
+                
+                processNewTransports(isRebalanceUpdateURIs(), newUris);
+            }
+
             if (disposed || connectionFailure != null) {
                 reconnectMutex.notifyAll();
             }
@@ -1006,6 +1050,34 @@ public class FailoverTransport implement
             }
         }
     }
+    
+    /**
+     * @return the updateURIsURL
+     */
+    public String getUpdateURIsURL() {
+        return this.updateURIsURL;
+    }
+
+    /**
+     * @param updateURIsURL the updateURIsURL to set
+     */
+    public void setUpdateURIsURL(String updateURIsURL) {
+        this.updateURIsURL = updateURIsURL;
+    }
+    
+    /**
+     * @return the rebalanceUpdateURIs
+     */
+    public boolean isRebalanceUpdateURIs() {
+        return this.rebalanceUpdateURIs;
+    }
+
+    /**
+     * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
+     */
+    public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
+        this.rebalanceUpdateURIs = rebalanceUpdateURIs;
+    }
 
     public int getReceiveCounter() {
         Transport transport = connectedTransport.get();
@@ -1045,4 +1117,19 @@ public class FailoverTransport implement
         }
         return result;
     }
+    
+    private InputStreamReader getURLStream(String path) throws IOException {
+        InputStreamReader result = null;
+        URL url = null;
+        try {
+            url = new URL(path);
+            result = new InputStreamReader(url.openStream());
+        } catch (MalformedURLException e) {
+            // ignore - it could be a path to a a local file
+        }
+        if (result == null) {
+            result = new FileReader(path);
+        }
+        return result;
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java?rev=960296&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java Sun Jul  4 05:49:20 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class FailoverUpdateURIsTest extends TestCase {
+	
+	private static final String QUEUE_NAME = "test.failoverupdateuris";
+
+	public void testUpdateURIs() throws Exception {
+		
+		long timeout = 1000;
+		URI firstTcpUri = new URI("tcp://localhost:61616");
+		URI secondTcpUri = new URI("tcp://localhost:61626");
+                String targetDir = "target/" + getName();
+                new File(targetDir).mkdir();
+                File updateFile = new File(targetDir + "/updateURIsFile.txt");
+                System.out.println(updateFile);
+                System.out.println(updateFile.toURI());
+                System.out.println(updateFile.getAbsoluteFile());
+                System.out.println(updateFile.getAbsoluteFile().toURI());
+                FileOutputStream out = new FileOutputStream(updateFile);
+                out.write(firstTcpUri.toString().getBytes());
+                out.close();
+                              
+		BrokerService bs1 = new BrokerService();
+		bs1.setUseJmx(false);
+		bs1.addConnector(firstTcpUri);
+		bs1.start();
+
+                // no failover uri's to start with, must be read from file...
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
+		Connection connection = cf.createConnection();
+                connection.start();
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Queue theQueue = session.createQueue(QUEUE_NAME);
+		MessageProducer producer = session.createProducer(theQueue);
+		MessageConsumer consumer = session.createConsumer(theQueue);
+		Message message = session.createTextMessage("Test message");
+		producer.send(message);
+                Message msg = consumer.receive(2000);
+                assertNotNull(msg);
+		
+		bs1.stop();
+                bs1.waitUntilStopped();
+
+		BrokerService bs2 = new BrokerService();
+		bs2.setUseJmx(false);
+		bs2.addConnector(secondTcpUri);
+		bs2.start();
+		
+		// add the transport uri for broker number 2
+                out = new FileOutputStream(updateFile, true);
+                out.write(",".getBytes());
+                out.write(secondTcpUri.toString().getBytes());
+                out.close();
+
+                producer.send(message);
+                msg = consumer.receive(2000);
+                assertNotNull(msg);
+	}
+	
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain