You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/05/12 19:13:11 UTC

svn commit: r773985 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/FailoverTransport.java test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java

Author: gtully
Date: Tue May 12 17:13:10 2009
New Revision: 773985

URL: http://svn.apache.org/viewvc?rev=773985&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2246

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=773985&r1=773984&r2=773985&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue May 12 17:13:10 2009
@@ -700,9 +700,10 @@
                     Iterator<URI> iter = connectList.iterator();
                     while(iter.hasNext() && connectedTransport.get() == null && !disposed) {
                         URI uri = iter.next();
+                        Transport t = null;
                         try {
                             LOG.debug("Attempting connect to: " + uri);
-                            Transport t = TransportFactory.compositeConnect(uri);
+                            t = TransportFactory.compositeConnect(uri);
                             t.setTransportListener(myTransportListener);
                             t.start();
                             
@@ -743,6 +744,13 @@
                         } catch (Exception e) {
                             failure = e;
                             LOG.debug("Connect fail to: " + uri + ", reason: " + e);
+                            if (t!=null) {
+                                try {
+                                    t.stop();       
+                                } catch (Exception ee) {
+                                    LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
+                                }
+                            }
                         }
                     }
                 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java?rev=773985&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java Tue May 12 17:13:10 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.*;
+
+import javax.jms.Connection;
+import javax.net.ServerSocketFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+public class SlowConnectionTest extends TestCase {
+        
+    public void testSlowConnection() throws Exception {
+        
+        int timeout = 1000;
+        URI tcpUri = new URI("tcp://localhost:61616?soTimeout=" + timeout + "&trace=true&connectionTimeout=" + timeout + "&wireFormat.maxInactivityDurationInitalDelay=" + timeout);
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")");
+        final Connection connection = cf.createConnection();
+        
+        MockBroker broker = new MockBroker();
+        broker.start();
+        
+        new Thread(new Runnable() {
+            public void run() {
+                try { connection.start(); } catch (Throwable ignored) {}
+            }
+        }).start();
+        
+        Thread.sleep(timeout * 5);
+        
+        int count = 0;
+        for (Thread thread : Thread.getAllStackTraces().keySet()) {
+            if (thread.getName().contains("ActiveMQ Transport")) { count++; }
+        }
+        
+        broker.interrupt();
+        broker.join();
+        
+        assertTrue("Transport count: " + count + ", expected <= 1", count <= 1);        
+    }   
+    
+    class MockBroker extends Thread {
+        
+        public void run() {
+            
+            List<Socket> inProgress = new ArrayList<Socket>();
+            ServerSocketFactory factory = ServerSocketFactory.getDefault();
+            ServerSocket ss = null;
+            
+            try {
+                ss = factory.createServerSocket(61616);
+                
+                while (!interrupted()) {
+                    inProgress.add(ss.accept());    // eat socket
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                try { ss.close(); } catch (IOException ignored) {}
+                for (Socket s : inProgress) {               
+                    try { s.close(); } catch (IOException ignored) {}
+                }               
+            }           
+        }
+    }
+}
+

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date