You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/06 12:36:04 UTC
svn commit: r1198340 - in /camel/trunk/components/camel-ftp/src:
main/java/org/apache/camel/component/file/remote/
test/java/org/apache/camel/component/file/remote/
Author: davsclaus
Date: Sun Nov 6 11:36:04 2011
New Revision: 1198340
URL: http://svn.apache.org/viewvc?rev=1198340&view=rev
Log:
CAMEL-4010: FTP consumer should process UoW synchronously to ensure done work is done with same thread, as the FTP libraries is not thread safe.
Added:
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
Modified:
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java?rev=1198340&r1=1198339&r2=1198340&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java Sun Nov 6 11:36:04 2011
@@ -193,11 +193,13 @@ public class FtpOperations implements Re
public void disconnect() throws GenericFileOperationFailedException {
// logout before disconnecting
try {
+ log.trace("Client logout");
client.logout();
} catch (IOException e) {
throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
} finally {
try {
+ log.trace("Client disconnect");
client.disconnect();
} catch (IOException e) {
throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
@@ -228,6 +230,7 @@ public class FtpOperations implements Re
}
// delete the file
+ log.trace("Client deleteFile: {}", target);
result = client.deleteFile(target);
// change back to previous directory
@@ -321,6 +324,7 @@ public class FtpOperations implements Re
remoteName = FileUtil.stripPath(name);
}
+ log.trace("Client retrieveFile: {}", remoteName);
result = client.retrieveFile(remoteName, os);
// change back to current directory
@@ -403,6 +407,7 @@ public class FtpOperations implements Re
remoteName = FileUtil.stripPath(name);
}
+ log.trace("Client retrieveFile: {}", remoteName);
result = client.retrieveFile(remoteName, os);
// change back to current directory
@@ -496,8 +501,10 @@ public class FtpOperations implements Re
try {
is = exchange.getIn().getMandatoryBody(InputStream.class);
if (endpoint.getFileExist() == GenericFileExist.Append) {
+ log.trace("Client appendFile: {}", targetName);
return client.appendFile(targetName, is);
} else {
+ log.trace("Client storeFile: {}", targetName);
return client.storeFile(targetName, is);
}
} catch (IOException e) {
@@ -542,7 +549,7 @@ public class FtpOperations implements Re
}
protected boolean fastExistsFile(String name) throws GenericFileOperationFailedException {
- log.trace("fastexistsFile({})", name);
+ log.trace("fastExistsFile({})", name);
try {
String[] names = client.listNames(name);
if (names == null) {
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1198340&r1=1198339&r2=1198340&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Sun Nov 6 11:36:04 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.file.
import java.io.IOException;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileConsumer;
@@ -86,6 +87,14 @@ public abstract class RemoteFileConsumer
}
@Override
+ protected void processExchange(Exchange exchange) {
+ // mark the exchange to be processed synchronously as the ftp client is not thread safe
+ // and we must execute the callbacks in the same thread as this consumer
+ exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
+ super.processExchange(exchange);
+ }
+
+ @Override
protected void doStop() throws Exception {
super.doStop();
disconnect();
Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java?rev=1198340&view=auto
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java (added)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java Sun Nov 6 11:36:04 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class FromFtpAsyncProcessTest extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/async/?password=admin&delete=true";
+ }
+
+ @Test
+ public void testFtpAsyncProcess() throws Exception {
+ template.sendBodyAndHeader("file:res/home/async", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file:res/home/async", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+ getMockEndpoint("mock:result").expectedMessageCount(2);
+ getMockEndpoint("mock:result").expectedHeaderReceived("foo", 123);
+
+ // the log file should log that all the ftp client work is done in the same thread (fully synchronous)
+ // as the ftp client is not thread safe and must process fully synchronous
+
+ context.startRoute("foo");
+
+ assertMockEndpointsSatisfied();
+
+ // give time for files to be deleted on ftp server
+ Thread.sleep(1000);
+
+ File hello = new File("res/home/async/hello.txt");
+ assertFalse("File should not exist " + hello, hello.exists());
+
+ File bye = new File("res/home/async/bye.txt");
+ assertFalse("File should not exist " + bye, bye.exists());
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(getFtpUrl()).routeId("foo").noAutoStartup()
+ .process(new MyAsyncProcessor())
+ .to("mock:result");
+ }
+ };
+ }
+
+ private class MyAsyncProcessor implements AsyncProcessor {
+
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ exchange.getIn().setHeader("foo", 123);
+ callback.done(false);
+ }
+ });
+
+ return false;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // noop
+ }
+ }
+
+
+}