You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/02/11 21:42:08 UTC

svn commit: r1069950 - /camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java

Author: cmueller
Date: Fri Feb 11 20:42:08 2011
New Revision: 1069950

URL: http://svn.apache.org/viewvc?rev=1069950&view=rev
Log:
CAMEL-3650: SMSC initiated unbind spawns exponential amounts of reconnect threads

Modified:
    camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java

Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java?rev=1069950&r1=1069949&r2=1069950&view=diff
==============================================================================
--- camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java (original)
+++ camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java Fri Feb 11 20:42:08 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
@@ -53,6 +54,7 @@ public class SmppProducer extends Defaul
     private SmppConfiguration configuration;
     private SMPPSession session;
     private SessionStateListener sessionStateListener;
+    private final ReentrantLock reconnectLock = new ReentrantLock();
 
     public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) {
         super(endpoint);
@@ -185,32 +187,38 @@ public class SmppProducer extends Defaul
     }
 
     private void reconnect(final long initialReconnectDelay) {
-        new Thread() {
-            @Override
-            public void run() {
-                LOG.info("Schedule reconnect after " + initialReconnectDelay + " millis");
-                try {
-                    Thread.sleep(initialReconnectDelay);
-                } catch (InterruptedException e) {
-                }
-
-                int attempt = 0;
-                while (!(isStopping() || isStopped()) && (session == null || session.getSessionState().equals(SessionState.CLOSED))) {
+        if (reconnectLock.tryLock()) {
+            new Thread() {
+                @Override
+                public void run() {
                     try {
-                        LOG.info("Trying to reconnect to " + getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "...");
-                        session = createSession();
-                    } catch (IOException e) {
-                        LOG.info("Failed to reconnect to " + getEndpoint().getConnectionString());
-                        closeSession(session);
+                        LOG.info("Schedule reconnect after " + initialReconnectDelay + " millis");
                         try {
-                            Thread.sleep(configuration.getReconnectDelay());
-                        } catch (InterruptedException ee) {
+                            Thread.sleep(initialReconnectDelay);
+                        } catch (InterruptedException e) {
+                        }
+
+                        int attempt = 0;
+                        while (!(isStopping() || isStopped()) && (session == null || session.getSessionState().equals(SessionState.CLOSED))) {
+                            try {
+                                LOG.info("Trying to reconnect to " + getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "...");
+                                session = createSession();
+                            } catch (IOException e) {
+                                LOG.info("Failed to reconnect to " + getEndpoint().getConnectionString());
+                                closeSession(session);
+                                try {
+                                    Thread.sleep(configuration.getReconnectDelay());
+                                } catch (InterruptedException ee) {
+                                }
+                            }
                         }
+                        LOG.info("Reconnected to " + getEndpoint().getConnectionString());                        
+                    } finally {
+                        reconnectLock.unlock();
                     }
                 }
-                LOG.info("Reconnected to " + getEndpoint().getConnectionString());
-            }
-        }.start();
+            }.start();            
+        }
     }
     
     @Override