You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/02/11 21:42:08 UTC
svn commit: r1069950 -
/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
Author: cmueller
Date: Fri Feb 11 20:42:08 2011
New Revision: 1069950
URL: http://svn.apache.org/viewvc?rev=1069950&view=rev
Log:
CAMEL-3650: SMSC initiated unbind spawns exponential amounts of reconnect threads
Modified:
camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java?rev=1069950&r1=1069949&r2=1069950&view=diff
==============================================================================
--- camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java (original)
+++ camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java Fri Feb 11 20:42:08 2011
@@ -17,6 +17,7 @@
package org.apache.camel.component.smpp;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
@@ -53,6 +54,7 @@ public class SmppProducer extends Defaul
private SmppConfiguration configuration;
private SMPPSession session;
private SessionStateListener sessionStateListener;
+ private final ReentrantLock reconnectLock = new ReentrantLock();
public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) {
super(endpoint);
@@ -185,32 +187,38 @@ public class SmppProducer extends Defaul
}
private void reconnect(final long initialReconnectDelay) {
- new Thread() {
- @Override
- public void run() {
- LOG.info("Schedule reconnect after " + initialReconnectDelay + " millis");
- try {
- Thread.sleep(initialReconnectDelay);
- } catch (InterruptedException e) {
- }
-
- int attempt = 0;
- while (!(isStopping() || isStopped()) && (session == null || session.getSessionState().equals(SessionState.CLOSED))) {
+ if (reconnectLock.tryLock()) {
+ new Thread() {
+ @Override
+ public void run() {
try {
- LOG.info("Trying to reconnect to " + getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "...");
- session = createSession();
- } catch (IOException e) {
- LOG.info("Failed to reconnect to " + getEndpoint().getConnectionString());
- closeSession(session);
+ LOG.info("Schedule reconnect after " + initialReconnectDelay + " millis");
try {
- Thread.sleep(configuration.getReconnectDelay());
- } catch (InterruptedException ee) {
+ Thread.sleep(initialReconnectDelay);
+ } catch (InterruptedException e) {
+ }
+
+ int attempt = 0;
+ while (!(isStopping() || isStopped()) && (session == null || session.getSessionState().equals(SessionState.CLOSED))) {
+ try {
+ LOG.info("Trying to reconnect to " + getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "...");
+ session = createSession();
+ } catch (IOException e) {
+ LOG.info("Failed to reconnect to " + getEndpoint().getConnectionString());
+ closeSession(session);
+ try {
+ Thread.sleep(configuration.getReconnectDelay());
+ } catch (InterruptedException ee) {
+ }
+ }
}
+ LOG.info("Reconnected to " + getEndpoint().getConnectionString());
+ } finally {
+ reconnectLock.unlock();
}
}
- LOG.info("Reconnected to " + getEndpoint().getConnectionString());
- }
- }.start();
+ }.start();
+ }
}
@Override