You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/04 07:49:20 UTC
svn commit: r960296 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/failover/FailoverTransport.java
test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
Author: rajdavies
Date: Sun Jul 4 05:49:20 2010
New Revision: 960296
URL: http://svn.apache.org/viewvc?rev=960296&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2807
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=960296&r1=960295&r2=960296&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Sun Jul 4 05:49:20 2010
@@ -17,10 +17,16 @@
package org.apache.activemq.transport.failover;
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.net.InetAddress;
+import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,7 +37,6 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@@ -113,6 +118,8 @@ public class FailoverTransport implement
private boolean reconnectSupported=true;
// remember for reconnect thread
private SslContext brokerSslContext;
+ private String updateURIsURL = null;
+ private boolean rebalanceUpdateURIs=true;
public FailoverTransport() throws InterruptedIOException {
brokerSslContext = SslContext.getCurrentSslContext();
@@ -257,12 +264,15 @@ public class FailoverTransport implement
}
}
}
- String connectedStr = control.getConnectedBrokers();
- if (connectedStr != null) {
- connectedStr = connectedStr.trim();
- if (connectedStr.length() > 0 && isUpdateURIsSupported()) {
+ processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
+ }
+
+ private final void processNewTransports(boolean rebalance, String newTransports) {
+ if (newTransports != null) {
+ newTransports = newTransports.trim();
+ if (newTransports.length() > 0 && isUpdateURIsSupported()) {
List<URI> list = new ArrayList<URI>();
- StringTokenizer tokenizer = new StringTokenizer(connectedStr, ",");
+ StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
while (tokenizer.hasMoreTokens()) {
String str = tokenizer.nextToken();
try {
@@ -274,9 +284,9 @@ public class FailoverTransport implement
}
if (list.isEmpty() == false) {
try {
- updateURIs(control.isRebalanceConnection(), list.toArray(new URI[list.size()]));
+ updateURIs(rebalance, list.toArray(new URI[list.size()]));
} catch (IOException e) {
- LOG.error("Failed to update transport URI's from: " + connectedStr, e);
+ LOG.error("Failed to update transport URI's from: " + newTransports, e);
}
}
@@ -752,6 +762,40 @@ public class FailoverTransport implement
Exception failure = null;
synchronized (reconnectMutex) {
+ // If updateURIsURL is specified, read the file and add any new
+ // transport URI's to this FailOverTransport.
+ // Note: Could track file timestamp to avoid unnecessary reading.
+ String fileURL = getUpdateURIsURL();
+ if (fileURL != null) {
+ BufferedReader in = null;
+ String newUris = null;
+ StringBuffer buffer = new StringBuffer();
+
+ try {
+ in = new BufferedReader(getURLStream(fileURL));
+ while (true) {
+ String line = in.readLine();
+ if (line == null) {
+ break;
+ }
+ buffer.append(line);
+ }
+ newUris = buffer.toString();
+ } catch (IOException ioe) {
+ LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException ioe) {
+ // ignore
+ }
+ }
+ }
+
+ processNewTransports(isRebalanceUpdateURIs(), newUris);
+ }
+
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
@@ -1006,6 +1050,34 @@ public class FailoverTransport implement
}
}
}
+
+ /**
+ * @return the updateURIsURL
+ */
+ public String getUpdateURIsURL() {
+ return this.updateURIsURL;
+ }
+
+ /**
+ * @param updateURIsURL the updateURIsURL to set
+ */
+ public void setUpdateURIsURL(String updateURIsURL) {
+ this.updateURIsURL = updateURIsURL;
+ }
+
+ /**
+ * @return the rebalanceUpdateURIs
+ */
+ public boolean isRebalanceUpdateURIs() {
+ return this.rebalanceUpdateURIs;
+ }
+
+ /**
+ * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
+ */
+ public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
+ this.rebalanceUpdateURIs = rebalanceUpdateURIs;
+ }
public int getReceiveCounter() {
Transport transport = connectedTransport.get();
@@ -1045,4 +1117,19 @@ public class FailoverTransport implement
}
return result;
}
+
+ private InputStreamReader getURLStream(String path) throws IOException {
+ InputStreamReader result = null;
+ URL url = null;
+ try {
+ url = new URL(path);
+ result = new InputStreamReader(url.openStream());
+ } catch (MalformedURLException e) {
+ // ignore - it could be a path to a a local file
+ }
+ if (result == null) {
+ result = new FileReader(path);
+ }
+ return result;
+ }
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java?rev=960296&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java Sun Jul 4 05:49:20 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class FailoverUpdateURIsTest extends TestCase {
+
+ private static final String QUEUE_NAME = "test.failoverupdateuris";
+
+ public void testUpdateURIs() throws Exception {
+
+ long timeout = 1000;
+ URI firstTcpUri = new URI("tcp://localhost:61616");
+ URI secondTcpUri = new URI("tcp://localhost:61626");
+ String targetDir = "target/" + getName();
+ new File(targetDir).mkdir();
+ File updateFile = new File(targetDir + "/updateURIsFile.txt");
+ System.out.println(updateFile);
+ System.out.println(updateFile.toURI());
+ System.out.println(updateFile.getAbsoluteFile());
+ System.out.println(updateFile.getAbsoluteFile().toURI());
+ FileOutputStream out = new FileOutputStream(updateFile);
+ out.write(firstTcpUri.toString().getBytes());
+ out.close();
+
+ BrokerService bs1 = new BrokerService();
+ bs1.setUseJmx(false);
+ bs1.addConnector(firstTcpUri);
+ bs1.start();
+
+ // no failover uri's to start with, must be read from file...
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue theQueue = session.createQueue(QUEUE_NAME);
+ MessageProducer producer = session.createProducer(theQueue);
+ MessageConsumer consumer = session.createConsumer(theQueue);
+ Message message = session.createTextMessage("Test message");
+ producer.send(message);
+ Message msg = consumer.receive(2000);
+ assertNotNull(msg);
+
+ bs1.stop();
+ bs1.waitUntilStopped();
+
+ BrokerService bs2 = new BrokerService();
+ bs2.setUseJmx(false);
+ bs2.addConnector(secondTcpUri);
+ bs2.start();
+
+ // add the transport uri for broker number 2
+ out = new FileOutputStream(updateFile, true);
+ out.write(",".getBytes());
+ out.write(secondTcpUri.toString().getBytes());
+ out.close();
+
+ producer.send(message);
+ msg = consumer.receive(2000);
+ assertNotNull(msg);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain