You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/06 12:36:04 UTC

svn commit: r1198340 - in /camel/trunk/components/camel-ftp/src: main/java/org/apache/camel/component/file/remote/ test/java/org/apache/camel/component/file/remote/

Author: davsclaus
Date: Sun Nov  6 11:36:04 2011
New Revision: 1198340

URL: http://svn.apache.org/viewvc?rev=1198340&view=rev
Log:
CAMEL-4010: FTP consumer should process UoW synchronously to ensure done work is done with same thread, as the FTP libraries is not thread safe.

Added:
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
Modified:
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java?rev=1198340&r1=1198339&r2=1198340&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java Sun Nov  6 11:36:04 2011
@@ -193,11 +193,13 @@ public class FtpOperations implements Re
     public void disconnect() throws GenericFileOperationFailedException {
         // logout before disconnecting
         try {
+            log.trace("Client logout");
             client.logout();
         } catch (IOException e) {
             throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
         } finally {
             try {
+                log.trace("Client disconnect");
                 client.disconnect();
             } catch (IOException e) {
                 throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
@@ -228,6 +230,7 @@ public class FtpOperations implements Re
             }
 
             // delete the file
+            log.trace("Client deleteFile: {}", target);
             result = client.deleteFile(target);
 
             // change back to previous directory
@@ -321,6 +324,7 @@ public class FtpOperations implements Re
                 remoteName = FileUtil.stripPath(name);
             }
 
+            log.trace("Client retrieveFile: {}", remoteName);
             result = client.retrieveFile(remoteName, os);
 
             // change back to current directory
@@ -403,6 +407,7 @@ public class FtpOperations implements Re
                 remoteName = FileUtil.stripPath(name);
             }
 
+            log.trace("Client retrieveFile: {}", remoteName);
             result = client.retrieveFile(remoteName, os);
 
             // change back to current directory
@@ -496,8 +501,10 @@ public class FtpOperations implements Re
         try {
             is = exchange.getIn().getMandatoryBody(InputStream.class);
             if (endpoint.getFileExist() == GenericFileExist.Append) {
+                log.trace("Client appendFile: {}", targetName);
                 return client.appendFile(targetName, is);
             } else {
+                log.trace("Client storeFile: {}", targetName);
                 return client.storeFile(targetName, is);
             }
         } catch (IOException e) {
@@ -542,7 +549,7 @@ public class FtpOperations implements Re
     }
 
     protected boolean fastExistsFile(String name) throws GenericFileOperationFailedException {
-        log.trace("fastexistsFile({})", name);
+        log.trace("fastExistsFile({})", name);
         try {
             String[] names = client.listNames(name);
             if (names == null) {

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1198340&r1=1198339&r2=1198340&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Sun Nov  6 11:36:04 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.file.
 
 import java.io.IOException;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileConsumer;
@@ -86,6 +87,14 @@ public abstract class RemoteFileConsumer
     }
 
     @Override
+    protected void processExchange(Exchange exchange) {
+        // mark the exchange to be processed synchronously as the ftp client is not thread safe
+        // and we must execute the callbacks in the same thread as this consumer
+        exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
+        super.processExchange(exchange);
+    }
+
+    @Override
     protected void doStop() throws Exception {
         super.doStop();
         disconnect();

Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java?rev=1198340&view=auto
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java (added)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpAsyncProcessTest.java Sun Nov  6 11:36:04 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class FromFtpAsyncProcessTest extends FtpServerTestSupport {
+
+    private String getFtpUrl() {
+        return "ftp://admin@localhost:" + getPort() + "/async/?password=admin&delete=true";
+    }
+
+    @Test
+    public void testFtpAsyncProcess() throws Exception {
+        template.sendBodyAndHeader("file:res/home/async", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:res/home/async", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+        getMockEndpoint("mock:result").expectedHeaderReceived("foo", 123);
+
+        // the log file should log that all the ftp client work is done in the same thread (fully synchronous)
+        // as the ftp client is not thread safe and must process fully synchronous
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        // give time for files to be deleted on ftp server
+        Thread.sleep(1000);
+
+        File hello = new File("res/home/async/hello.txt");
+        assertFalse("File should not exist " + hello, hello.exists());
+
+        File bye = new File("res/home/async/bye.txt");
+        assertFalse("File should not exist " + bye, bye.exists());
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(getFtpUrl()).routeId("foo").noAutoStartup()
+                    .process(new MyAsyncProcessor())
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyAsyncProcessor implements AsyncProcessor {
+
+        private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        @Override
+        public boolean process(final Exchange exchange, final AsyncCallback callback) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+
+                    exchange.getIn().setHeader("foo", 123);
+                    callback.done(false);
+                }
+            });
+
+            return false;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // noop
+        }
+    }
+
+
+}