You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/05/07 20:09:25 UTC
camel git commit: [CAMEL-8718] Prevent FTP(s) connection leak after
failed auth (producer&consumer)
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x 1b3867a6b -> 8c6dce78c
[CAMEL-8718] Prevent FTP(s) connection leak after failed auth (producer&consumer)
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c6dce78
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c6dce78
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c6dce78
Branch: refs/heads/camel-2.15.x
Commit: 8c6dce78c11c91d070ba1894c0a4d71336ad4e03
Parents: 1b3867a
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Thu May 7 20:08:35 2015 +0200
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Thu May 7 20:08:47 2015 +0200
----------------------------------------------------------------------
.../component/file/remote/FtpOperations.java | 2 +
.../file/remote/RemoteFileConsumer.java | 11 --
.../remote/FtpBadLoginConnectionLeakTest.java | 18 +--
...FtpBadLoginInProducerConnectionLeakTest.java | 124 +++++++++++++++
.../FtpBadLoginMockNoopConnectionLeakTest.java | 152 +++++++++++++++++++
5 files changed, 285 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8c6dce78/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
index 74dbf6f..0528362 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
@@ -175,6 +175,8 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
}
log.trace("User {} logged in: {}", username != null ? username : "anonymous", login);
if (!login) {
+ // disconnect to prevent connection leaks
+ client.disconnect();
throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString());
}
client.setFileType(configuration.isBinary() ? FTP.BINARY_FILE_TYPE : FTP.ASCII_FILE_TYPE);
http://git-wip-us.apache.org/repos/asf/camel/blob/8c6dce78/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index bf28f14..2f58c93 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -196,17 +196,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
}
}
- try {
- // we may as well be connected, but not logged in. let's disconnect to prevent connection leak
- if (!isConnected && getOperations().isConnected()) {
- getOperations().disconnect();
- }
- } catch (Exception ex) {
- if (log.isDebugEnabled()) {
- log.debug("Exception during disconnect: " + ex.getMessage());
- }
- }
-
if (!loggedIn || !isConnected) {
if (log.isDebugEnabled()) {
log.debug("Not connected/logged in, connecting to: {}", remoteServer());
http://git-wip-us.apache.org/repos/asf/camel/blob/8c6dce78/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
index 54b1574..ff430ee 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java
@@ -22,7 +22,6 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import org.apache.camel.builder.RouteBuilder;
@@ -32,24 +31,20 @@ import org.junit.Test;
public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport {
- private final AtomicInteger exceptionCount = new AtomicInteger(0);
-
- private String getFtpUrl() {
- return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=0" +
- "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
- }
-
/**
* Mapping of socket hashcode to two element tab ([connect() called, close() called])
*/
private Map<Integer, boolean[]> socketAudits = new HashMap<>();
- @Override
+ private String getFtpUrl() {
+ return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber" +
+ "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+ }
+ @Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry jndi = super.createRegistry();
- final SocketFactory defaultSocketFactory = SocketFactory.getDefault();
SocketFactory sf = new AuditingSocketFactory();
jndi.bind("sf", sf);
return jndi;
@@ -67,12 +62,13 @@ public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport {
for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
assertTrue("Socket should be connected", socketStats.getValue()[0]);
- assertEquals("Socket should be closed", socketStats.getValue()[1], socketStats.getValue()[0]);
+ assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
}
mock.assertIsSatisfied();
}
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/8c6dce78/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
new file mode 100644
index 0000000..e3314d0
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.SocketFactory;
+
+import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class FtpBadLoginInProducerConnectionLeakTest extends FtpServerTestSupport {
+
+ /**
+ * Mapping of socket hashcode to two element tab ([connect() called, close() called])
+ */
+ private Map<Integer, boolean[]> socketAudits = new HashMap<>();
+
+ private String getFtpUrl() {
+ return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" +
+ "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+
+ SocketFactory sf = new AuditingSocketFactory();
+ jndi.bind("sf", sf);
+ return jndi;
+ }
+
+ @Test
+ public void testConnectionLeak() throws Exception {
+ for (String filename : new String[] { "claus.txt", "grzegorz.txt" }) {
+ try {
+ sendFile(getFtpUrl(), "Hello World", filename);
+ } catch (Exception ignored) {
+ // expected
+ }
+ }
+
+ // maximumReconnectAttempts is related to TCP connects, not to FTP login attempts
+ // but having this parameter > 0 leads to two connection attempts
+ assertEquals("Expected 4 socket connections to be created", 4, socketAudits.size());
+
+ for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
+ assertTrue("Socket should be connected", socketStats.getValue()[0]);
+ assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
+ }
+ }
+
+ /**
+ * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()}
+ * invocations
+ */
+ private class AuditingSocketFactory extends SocketFactory {
+
+ @Override
+ public Socket createSocket(String s, int i) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Socket createSocket() throws IOException {
+ AuditingSocket socket = new AuditingSocket();
+ socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false });
+ return socket;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+ return null;
+ }
+ }
+
+ /**
+ * {@link Socket} which counts connect()/close() invocations
+ */
+ private class AuditingSocket extends Socket {
+
+ @Override
+ public void connect(SocketAddress endpoint, int timeout) throws IOException {
+ super.connect(endpoint, timeout);
+ socketAudits.get(System.identityHashCode(this))[0] = true;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ super.close();
+ socketAudits.get(System.identityHashCode(this))[1] = true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8c6dce78/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
new file mode 100644
index 0000000..0b895f5
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.SocketFactory;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.commons.net.ftp.FTPClient;
+import org.junit.Test;
+
+/**
+ * Test which checks leaking connections when FTP server returns correct status for NOOP operation.
+ */
+public class FtpBadLoginMockNoopConnectionLeakTest extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" +
+ "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf";
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class);
+ endpoint.setFtpClient(new FTPClient() {
+ @Override
+ public boolean sendNoOp() throws IOException {
+ // return true as long as connection is established
+ return this.isConnected();
+ }
+ });
+ }
+
+ /**
+ * Mapping of socket hashcode to two element tab ([connect() called, close() called])
+ */
+ private Map<Integer, boolean[]> socketAudits = new HashMap<>();
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+
+ SocketFactory sf = new AuditingSocketFactory();
+ jndi.bind("sf", sf);
+ return jndi;
+ }
+
+ @Test
+ public void testConnectionLeak() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+
+ // let's have several login attempts
+ Thread.sleep(3000L);
+
+ stopCamelContext();
+
+ for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) {
+ assertTrue("Socket should be connected", socketStats.getValue()[0]);
+ assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]);
+ }
+
+ mock.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(getFtpUrl()).to("mock:result");
+ }
+ };
+ }
+
+ /**
+ * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()}
+ * invocations
+ */
+ private class AuditingSocketFactory extends SocketFactory {
+
+ @Override
+ public Socket createSocket(String s, int i) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Socket createSocket() throws IOException {
+ AuditingSocket socket = new AuditingSocket();
+ socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false });
+ return socket;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+ return null;
+ }
+ }
+
+ /**
+ * {@link Socket} which counts connect()/close() invocations
+ */
+ private class AuditingSocket extends Socket {
+
+ @Override
+ public void connect(SocketAddress endpoint, int timeout) throws IOException {
+ log.info("Connecting socket {}", System.identityHashCode(this));
+ super.connect(endpoint, timeout);
+ socketAudits.get(System.identityHashCode(this))[0] = true;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ log.info("Disconnecting socket {}", System.identityHashCode(this));
+ super.close();
+ socketAudits.get(System.identityHashCode(this))[1] = true;
+ }
+ }
+
+}