You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/16 13:31:25 UTC

[camel] branch camel-3.4.x updated: Add reconnect during FTPOperation delete and rename

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.4.x by this push:
     new aae93b5  Add reconnect during FTPOperation delete and rename
aae93b5 is described below

commit aae93b567d160b8e33c496d3f4781f0015d7068d
Author: Lukas Holthof <lu...@sap.com>
AuthorDate: Wed Jul 15 22:09:50 2020 +0200

    Add reconnect during FTPOperation delete and rename
    
    Change-Id: Ie97fee70f26792793166c491755665153bc3e746
---
 .../camel/component/file/remote/FtpOperations.java |  19 +++-
 .../FtpConsumerPostProcessingOnDisconnect.java     | 117 +++++++++++++++++++++
 .../file/remote/FtpServerTestSupport.java          |  17 +++
 3 files changed, 152 insertions(+), 1 deletion(-)

diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
index 5837418..0a83906 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
@@ -303,12 +303,13 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
     @Override
     public boolean deleteFile(String name) throws GenericFileOperationFailedException {
         log.debug("Deleting file: {}", name);
-
+        
         boolean result;
         String target = name;
         String currentDir = null;
 
         try {
+            reconnectIfNecessary(null);
             if (endpoint.getConfiguration().isStepwise()) {
                 // remember current directory
                 currentDir = getCurrentDirectory();
@@ -343,6 +344,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
     public boolean renameFile(String from, String to) throws GenericFileOperationFailedException {
         log.debug("Renaming file: {} to: {}", from, to);
         try {
+            reconnectIfNecessary(null);
             return client.rename(from, to);
         } catch (IOException e) {
             throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
@@ -986,5 +988,20 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> {
     public FTPClient getClient() {
         return client;
     }
+    
+    private void reconnectIfNecessary(Exchange exchange) throws GenericFileOperationFailedException {
+        if (isConnected()) {
+            log.trace("sendNoOp to check if connection should be reconnected");
+            try {
+                client.sendNoOp();
+            } catch (IOException e) {
+                log.trace("NoOp to server failed, try to reconnect");
+                connect(endpoint.getConfiguration(), exchange);
+            }
+        } else {
+            log.trace("Client is not connected, try to reconnect");
+            connect(endpoint.getConfiguration(), exchange);
+        }
+    }
 
 }
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java
new file mode 100644
index 0000000..8bff7dc
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class FtpConsumerPostProcessingOnDisconnect extends FtpServerTestSupport {
+    private static final String SAMPLE_FILE_NAME_1 = String.format("sample-1-%s.txt",
+            FtpConsumerPostProcessingOnDisconnect.class.getSimpleName());
+    private static final String SAMPLE_FILE_NAME_2 = String.format("sample-2-%s.txt",
+            FtpConsumerPostProcessingOnDisconnect.class.getSimpleName());
+    private static final String SAMPLE_FILE_CHARSET = "iso-8859-1";
+    private static final String SAMPLE_FILE_PAYLOAD = "abc";
+
+    @Test
+    public void testConsumeDelete() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        // prepare sample file to be consumed by FTP consumer
+        createSampleFile(SAMPLE_FILE_NAME_1);
+
+        // Prepare expectations
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD);
+
+        context.getRouteController().startRoute("foo");
+
+        // Check that expectations are satisfied
+        assertMockEndpointsSatisfied();
+
+        // File is deleted
+        Thread.sleep(250);
+        File deletedFile = new File(FTP_ROOT_DIR + "/" + SAMPLE_FILE_NAME_1);
+        assertFalse(deletedFile.exists(), "File should have been deleted: " + deletedFile);
+    }
+
+    @Test
+    public void testConsumeMove() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        // moved file after its processed
+        String movedFile = FTP_ROOT_DIR + "/.camel/" + SAMPLE_FILE_NAME_2;
+
+        // prepare sample file to be consumed by FTP consumer
+        createSampleFile(SAMPLE_FILE_NAME_2);
+
+        // Prepare expectations
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD);
+        // use mock to assert that the file will be moved there eventually
+        mock.expectedFileExists(movedFile);
+
+        context.getRouteController().startRoute("bar");
+
+        // Check that expectations are satisfied
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("ftp://admin@localhost:" + getPort() + "?password=admin&delete=true").routeId("foo").noAutoStartup()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                disconnectAllSessions(); // disconnect all Sessions on FTP server
+                            }
+                        }).to("mock:result");
+                from("ftp://admin@localhost:" + getPort() + "?password=admin&noop=false&move=.camel").routeId("bar").noAutoStartup()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                disconnectAllSessions(); // disconnect all Sessions on FTP server
+                            }
+                        }).to("mock:result");
+            }
+        };
+    }
+
+    private void createSampleFile(String fileName) throws IOException {
+        File file = new File(FTP_ROOT_DIR + "/" + fileName);
+        FileUtils.write(file, SAMPLE_FILE_PAYLOAD, SAMPLE_FILE_CHARSET);
+    }
+
+}
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
index 0585a29..2e1d772 100644
--- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
@@ -17,7 +17,10 @@
 package org.apache.camel.component.file.remote;
 
 import java.io.File;
+import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.util.ObjectHelper;
@@ -26,6 +29,9 @@ import org.apache.ftpserver.FtpServer;
 import org.apache.ftpserver.FtpServerFactory;
 import org.apache.ftpserver.filesystem.nativefs.NativeFileSystemFactory;
 import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.impl.DefaultFtpServer;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.listener.Listener;
 import org.apache.ftpserver.listener.ListenerFactory;
 import org.apache.ftpserver.usermanager.ClearTextPasswordEncryptor;
 import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
@@ -138,5 +144,16 @@ public abstract class FtpServerTestSupport extends BaseServerTestSupport {
     public void sendFile(String url, Object body, String fileName) {
         template.sendBodyAndHeader(url, body, Exchange.FILE_NAME, simple(fileName));
     }
+    
+    protected void disconnectAllSessions() throws IOException {
+        // stop all listeners
+        Map<String, Listener> listeners = ((DefaultFtpServer) ftpServer).getListeners();
+        for (Listener listener : listeners.values()) {
+            Set<FtpIoSession> sessions = listener.getActiveSessions();
+            for (FtpIoSession session : sessions) {
+                session.closeNow();
+            }
+        }
+    }
 
 }