You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/05/07 20:10:36 UTC

[1/2] camel git commit: [CAMEL-8718] Prevent FTP(s) connection leak after failed auth (producer&consumer)

Repository: camel
Updated Branches:
  refs/heads/camel-2.14.x ff476a30f -> 3f44084b3
  refs/heads/master d3dbda259 -> e1b6592a1


[CAMEL-8718] Prevent FTP(s) connection leak after failed auth (producer&consumer)

(cherry picked from commit 8c6dce78c11c91d070ba1894c0a4d71336ad4e03)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e1b6592a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e1b6592a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e1b6592a

Branch: refs/heads/master
Commit: e1b6592a18362a5903c88923d18abb3430c1c630
Parents: d3dbda2
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Thu May 7 20:08:35 2015 +0200
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Thu May 7 20:09:53 2015 +0200

----------------------------------------------------------------------
 .../component/file/remote/FtpOperations.java    |   2 +
 .../file/remote/RemoteFileConsumer.java         |  11 --
 .../remote/FtpBadLoginConnectionLeakTest.java   |  18 +--
 ...FtpBadLoginInProducerConnectionLeakTest.java | 124 +++++++++++++++
 .../FtpBadLoginMockNoopConnectionLeakTest.java  | 152 +++++++++++++++++++
 5 files changed, 285 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e1b6592a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
index 74dbf6f..0528362 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
@@ -175,6 +175,8 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
             }
             log.trace("User {} logged in: {}", username != null ? username : "anonymous", login);
             if (!login) {
+                // disconnect to prevent connection leaks
+                client.disconnect();
                 throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString());
             }
             client.setFileType(configuration.isBinary() ? FTP.BINARY_FILE_TYPE : FTP.ASCII_FILE_TYPE);

http://git-wip-us.apache.org/repos/asf/camel/blob/e1b6592a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index bf28f14..2f58c93 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -196,17 +196,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
             }
         }
 
-        try {
-            // we may as well be connected, but not logged in. let's disconnect to prevent connection leak
-            if (!isConnected && getOperations().isConnected()) {
-                getOperations().disconnect();
-            }
-        } catch (Exception ex) {
-            if (log.isDebugEnabled()) {
-                log.debug("Exception during disconnect: " + ex.getMessage());
-            }
-        }
-
         if (!loggedIn || !isConnected) {
             if (log.isDebugEnabled()) {
                 log.debug("Not connected/logged in, connecting to: {}", remoteServer());

http://git-wip-us.apache.org/repos/asf/camel/blob/e1b6592a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
index 54b1574..ff430ee 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
@@ -22,7 +22,6 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.SocketFactory;
 
 import org.apache.camel.builder.RouteBuilder;
@@ -32,24 +31,20 @@ import org.junit.Test;
 
 public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport {
 
-    private final AtomicInteger exceptionCount = new AtomicInteger(0);
-
-    private String getFtpUrl() {
-        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=0" +
-                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
-    }
-
     /**
      * Mapping of socket hashcode to two element tab ([connect() called, close() called])
      */
     private Map<Integer, boolean[]> socketAudits = new HashMap<>();
 
-    @Override
+    private String getFtpUrl() {
+        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber" +
+                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+    }
 
+    @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry jndi = super.createRegistry();
 
-        final SocketFactory defaultSocketFactory = SocketFactory.getDefault();
         SocketFactory sf = new AuditingSocketFactory();
         jndi.bind("sf", sf);
         return jndi;
@@ -67,12 +62,13 @@ public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport {
 
         for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
             assertTrue("Socket should be connected", socketStats.getValue()[0]);
-            assertEquals("Socket should be closed", socketStats.getValue()[1], socketStats.getValue()[0]);
+            assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
         }
 
         mock.assertIsSatisfied();
     }
 
+    @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/e1b6592a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
new file mode 100644
index 0000000..e3314d0
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.SocketFactory;
+
+import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class FtpBadLoginInProducerConnectionLeakTest extends FtpServerTestSupport {
+
+    /**
+     * Mapping of socket hashcode to two element tab ([connect() called, close() called])
+     */
+    private Map<Integer, boolean[]> socketAudits = new HashMap<>();
+
+    private String getFtpUrl() {
+        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" +
+                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        SocketFactory sf = new AuditingSocketFactory();
+        jndi.bind("sf", sf);
+        return jndi;
+    }
+
+    @Test
+    public void testConnectionLeak() throws Exception {
+        for (String filename : new String[] { "claus.txt", "grzegorz.txt" }) {
+            try {
+                sendFile(getFtpUrl(), "Hello World", filename);
+            } catch (Exception ignored) {
+                // expected
+            }
+        }
+
+        // maximumReconnectAttempts is related to TCP connects, not to FTP login attempts
+        // but having this parameter > 0 leads to two connection attempts
+        assertEquals("Expected 4 socket connections to be created", 4, socketAudits.size());
+
+        for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
+            assertTrue("Socket should be connected", socketStats.getValue()[0]);
+            assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
+        }
+    }
+
+    /**
+     * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()}
+     * invocations
+     */
+    private class AuditingSocketFactory extends SocketFactory {
+
+        @Override
+        public Socket createSocket(String s, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket() throws IOException {
+            AuditingSocket socket = new AuditingSocket();
+            socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false });
+            return socket;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+            return null;
+        }
+    }
+
+    /**
+     * {@link Socket} which counts connect()/close() invocations
+     */
+    private class AuditingSocket extends Socket {
+
+        @Override
+        public void connect(SocketAddress endpoint, int timeout) throws IOException {
+            super.connect(endpoint, timeout);
+            socketAudits.get(System.identityHashCode(this))[0] = true;
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            super.close();
+            socketAudits.get(System.identityHashCode(this))[1] = true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e1b6592a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
new file mode 100644
index 0000000..0b895f5
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.SocketFactory;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.commons.net.ftp.FTPClient;
+import org.junit.Test;
+
+/**
+ * Test which checks leaking connections when FTP server returns correct status for NOOP operation.
+ */
+public class FtpBadLoginMockNoopConnectionLeakTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" +
+                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class);
+        endpoint.setFtpClient(new FTPClient() {
+            @Override
+            public boolean sendNoOp() throws IOException {
+                // return true as long as connection is established
+                return this.isConnected();
+            }
+        });
+    }
+
+    /**
+     * Mapping of socket hashcode to two element tab ([connect() called, close() called])
+     */
+    private Map<Integer, boolean[]> socketAudits = new HashMap<>();
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        SocketFactory sf = new AuditingSocketFactory();
+        jndi.bind("sf", sf);
+        return jndi;
+    }
+
+    @Test
+    public void testConnectionLeak() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        // let's have several login attempts
+        Thread.sleep(3000L);
+
+        stopCamelContext();
+
+        for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
+            assertTrue("Socket should be connected", socketStats.getValue()[0]);
+            assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
+        }
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(getFtpUrl()).to("mock:result");
+            }
+        };
+    }
+
+    /**
+     * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()}
+     * invocations
+     */
+    private class AuditingSocketFactory extends SocketFactory {
+
+        @Override
+        public Socket createSocket(String s, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket() throws IOException {
+            AuditingSocket socket = new AuditingSocket();
+            socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false });
+            return socket;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+            return null;
+        }
+    }
+
+    /**
+     * {@link Socket} which counts connect()/close() invocations
+     */
+    private class AuditingSocket extends Socket {
+
+        @Override
+        public void connect(SocketAddress endpoint, int timeout) throws IOException {
+            log.info("Connecting socket {}", System.identityHashCode(this));
+            super.connect(endpoint, timeout);
+            socketAudits.get(System.identityHashCode(this))[0] = true;
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            log.info("Disconnecting socket {}", System.identityHashCode(this));
+            super.close();
+            socketAudits.get(System.identityHashCode(this))[1] = true;
+        }
+    }
+
+}


[2/2] camel git commit: [CAMEL-8718] Prevent FTP(s) connection leak after failed auth (producer&consumer)

Posted by gg...@apache.org.
[CAMEL-8718] Prevent FTP(s) connection leak after failed auth (producer&consumer)

(cherry picked from commit 8c6dce78c11c91d070ba1894c0a4d71336ad4e03)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f44084b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f44084b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f44084b

Branch: refs/heads/camel-2.14.x
Commit: 3f44084b38030e2fb6bd6bf399e1afedc4382d5a
Parents: ff476a3
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Thu May 7 20:08:35 2015 +0200
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Thu May 7 20:10:19 2015 +0200

----------------------------------------------------------------------
 .../component/file/remote/FtpOperations.java    |   2 +
 .../file/remote/RemoteFileConsumer.java         |  11 --
 .../remote/FtpBadLoginConnectionLeakTest.java   |  18 +--
 ...FtpBadLoginInProducerConnectionLeakTest.java | 124 +++++++++++++++
 .../FtpBadLoginMockNoopConnectionLeakTest.java  | 152 +++++++++++++++++++
 5 files changed, 285 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
index b5384ef..bcca1a0 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
@@ -163,6 +163,8 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
             }
             log.trace("User {} logged in: {}", username != null ? username : "anonymous", login);
             if (!login) {
+                // disconnect to prevent connection leaks
+                client.disconnect();
                 throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString());
             }
             client.setFileType(configuration.isBinary() ? FTP.BINARY_FILE_TYPE : FTP.ASCII_FILE_TYPE);

http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 9408ea4..df15b55 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -160,17 +160,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
             }
         }
 
-        try {
-            // we may as well be connected, but not logged in. let's disconnect to prevent connection leak
-            if (!isConnected && getOperations().isConnected()) {
-                getOperations().disconnect();
-            }
-        } catch (Exception ex) {
-            if (log.isDebugEnabled()) {
-                log.debug("Exception during disconnect: " + ex.getMessage());
-            }
-        }
-
         if (!loggedIn || !isConnected) {
             if (log.isDebugEnabled()) {
                 log.debug("Not connected/logged in, connecting to: {}", remoteServer());

http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
index 18eb877..13c8a16 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
@@ -22,7 +22,6 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.SocketFactory;
 
 import org.apache.camel.builder.RouteBuilder;
@@ -32,24 +31,20 @@ import org.junit.Test;
 
 public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport {
 
-    private final AtomicInteger exceptionCount = new AtomicInteger(0);
-
-    private String getFtpUrl() {
-        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=0" +
-                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
-    }
-
     /**
      * Mapping of socket hashcode to two element tab ([connect() called, close() called])
      */
     private Map<Integer, boolean[]> socketAudits = new HashMap<Integer, boolean[]>();
 
-    @Override
+    private String getFtpUrl() {
+        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber" +
+                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+    }
 
+    @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry jndi = super.createRegistry();
 
-        final SocketFactory defaultSocketFactory = SocketFactory.getDefault();
         SocketFactory sf = new AuditingSocketFactory();
         jndi.bind("sf", sf);
         return jndi;
@@ -67,12 +62,13 @@ public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport {
 
         for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
             assertTrue("Socket should be connected", socketStats.getValue()[0]);
-            assertEquals("Socket should be closed", socketStats.getValue()[1], socketStats.getValue()[0]);
+            assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
         }
 
         mock.assertIsSatisfied();
     }
 
+    @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
new file mode 100644
index 0000000..e3314d0
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.SocketFactory;
+
+import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class FtpBadLoginInProducerConnectionLeakTest extends FtpServerTestSupport {
+
+    /**
+     * Mapping of socket hashcode to two element tab ([connect() called, close() called])
+     */
+    private Map<Integer, boolean[]> socketAudits = new HashMap<>();
+
+    private String getFtpUrl() {
+        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" +
+                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        SocketFactory sf = new AuditingSocketFactory();
+        jndi.bind("sf", sf);
+        return jndi;
+    }
+
+    @Test
+    public void testConnectionLeak() throws Exception {
+        for (String filename : new String[] { "claus.txt", "grzegorz.txt" }) {
+            try {
+                sendFile(getFtpUrl(), "Hello World", filename);
+            } catch (Exception ignored) {
+                // expected
+            }
+        }
+
+        // maximumReconnectAttempts is related to TCP connects, not to FTP login attempts
+        // but having this parameter > 0 leads to two connection attempts
+        assertEquals("Expected 4 socket connections to be created", 4, socketAudits.size());
+
+        for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
+            assertTrue("Socket should be connected", socketStats.getValue()[0]);
+            assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
+        }
+    }
+
+    /**
+     * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()}
+     * invocations
+     */
+    private class AuditingSocketFactory extends SocketFactory {
+
+        @Override
+        public Socket createSocket(String s, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket() throws IOException {
+            AuditingSocket socket = new AuditingSocket();
+            socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false });
+            return socket;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+            return null;
+        }
+    }
+
+    /**
+     * {@link Socket} which counts connect()/close() invocations
+     */
+    private class AuditingSocket extends Socket {
+
+        @Override
+        public void connect(SocketAddress endpoint, int timeout) throws IOException {
+            super.connect(endpoint, timeout);
+            socketAudits.get(System.identityHashCode(this))[0] = true;
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            super.close();
+            socketAudits.get(System.identityHashCode(this))[1] = true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
new file mode 100644
index 0000000..0b895f5
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.SocketFactory;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.commons.net.ftp.FTPClient;
+import org.junit.Test;
+
+/**
+ * Test which checks leaking connections when FTP server returns correct status for NOOP operation.
+ */
+public class FtpBadLoginMockNoopConnectionLeakTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" +
+                "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class);
+        endpoint.setFtpClient(new FTPClient() {
+            @Override
+            public boolean sendNoOp() throws IOException {
+                // return true as long as connection is established
+                return this.isConnected();
+            }
+        });
+    }
+
+    /**
+     * Mapping of socket hashcode to two element tab ([connect() called, close() called])
+     */
+    private Map<Integer, boolean[]> socketAudits = new HashMap<>();
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        SocketFactory sf = new AuditingSocketFactory();
+        jndi.bind("sf", sf);
+        return jndi;
+    }
+
+    @Test
+    public void testConnectionLeak() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        // let's have several login attempts
+        Thread.sleep(3000L);
+
+        stopCamelContext();
+
+        for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
+            assertTrue("Socket should be connected", socketStats.getValue()[0]);
+            assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
+        }
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(getFtpUrl()).to("mock:result");
+            }
+        };
+    }
+
+    /**
+     * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()}
+     * invocations
+     */
+    private class AuditingSocketFactory extends SocketFactory {
+
+        @Override
+        public Socket createSocket(String s, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+            return null;
+        }
+
+        @Override
+        public Socket createSocket() throws IOException {
+            AuditingSocket socket = new AuditingSocket();
+            socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false });
+            return socket;
+        }
+
+        @Override
+        public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+            return null;
+        }
+    }
+
+    /**
+     * {@link Socket} which counts connect()/close() invocations
+     */
+    private class AuditingSocket extends Socket {
+
+        @Override
+        public void connect(SocketAddress endpoint, int timeout) throws IOException {
+            log.info("Connecting socket {}", System.identityHashCode(this));
+            super.connect(endpoint, timeout);
+            socketAudits.get(System.identityHashCode(this))[0] = true;
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            log.info("Disconnecting socket {}", System.identityHashCode(this));
+            super.close();
+            socketAudits.get(System.identityHashCode(this))[1] = true;
+        }
+    }
+
+}